user3408678 user3408678 -4 years ago 103
C Question

Asynchronous I/O reading from a file

I've gotten ideas for multiple projects recently that all involve reading IP addresses from a file. Since they are all supposed to be able to handle a large amount of hosts, I've attempted to implement multi-threading or creating a pool of sockets and select()-ing from them in order to achieve some form of concurrency for better performance. On multiple occasions, reading from the file seems to be the bottleneck in enhancing performance. The way I understand it, reading from a file with fgets or similar is a synchronous, blocking operation. So even if I successfully implemented a client that connects to multiple hosts asynchronously, the operation would still be synchronous because I can only read one address at a time from a file.

/* partially pseudo code */

/* getaddrinfo() stuff here */

while(fgets(ip, sizeof(ip), file) {
FD_ZERO(&readfds);
/* create n sockets here in a for loop */
for (i = 0; i < socket_num; i++) {
if (newfd > fd[i]) newfd = fd[i];
FD_SET(fd[i], &readfds);
}

/* here's where I think I should connect n sockets to n addresses from file
* but I'm only getting one IP at a time from file, so I'm not sure how to connect to
* n addresses at once with fgets
*/

for (j = 0; j < socket_num; j++) {
if ((connect(socket, ai->ai_addr, ai->ai_addrlen)) == -1)
// error
else {
freeaddrinfo(ai);
FD_SET(socket, &master);
fdmax = socket;
if (select(socket+1, &master, NULL, NULL, &tv) == -1);
// error
if ((recvd = read(socket, banner, RECVD)) <= 0)
// error
if (FD_ISSET(socket, &master))
// print success
}
/* clear sets and close sockets and stuff */
}


I've pointed out my issues with comments, but just to clarify: I'm not sure how to perform asynchronous I/O operations on multiple target servers read from a file, since reading entries from file seems to be strictly synchronous. I've run into similar isssues with multithreading, with a marginally better degree of success.

void *function_passed_to_pthread_create(void *opts)
{
while(fgets(ip_addr, sizeof(ip_addr), opts->file) {
/* speak to ip_addr and get response */
}
}

main()
{
/* necessary stuff */
for (i = 0; i < thread_num; i++) {
pthread_create(&tasks, NULL, above_function, opts)
}
for (j = 0; j < thread_num; j++)
/* join threads */
return 0;
}


This seems to work, but since multiple threads are all processing the same file the results aren't always accurate. I imagine it's because multiple threads may process the same address from file at the same time.

I've considered loading all the entries from a file into an array/into memory, but if the file was particularly large I imagine that could cause memory issues. On top of that, I'm not sure it that even makes sense to do anyway.

As a final note; if the file I'm reading from happens to be a particularly large file with a huge amount of IPs then I do not believe either solution scales well. Anything is possible with C though, so I imagine there is some way to achieve what I'm hoping to.

To sum this post up; I'd like to find a way to improve a client-side applications performance using asynchronous I/O or multi-threading when reading entries from a file.

Answer Source

Several people have hinted at a good solution to this in their comments, but it's probably worth spelling it out in more detail. The full solution has quite a lot of details and is pretty complicated code, so I'm going to use pseudocode to explain what I'd recommend.

What you have here is really a variation on a classic producer/consumer problem: You have a single thing producing data, and many things trying to consume that data. In your case, it must be a "single thing" producing that data, because the lengths of each line of the source file are unknown: You can't just jump forward 'n' bytes and somehow be at the next IP. There can only be one actor at a time moving the read pointer toward the next unknown position of the \n, so you by definition have a single producer.

There are three general ways to attack this:

  • Solution A involves having each thread pulling a little more out of a shared file buffer, and kicking off an asynchronous (nonblocking) read every time the last read completes. There are a whole host of headaches getting this solution right, as it's very sensitive to timing differences between the filesystem and the work being performed: If the file reads are slow, the workers will all stall waiting for the file. If the workers are slow, the file reader will either stall or fill up memory waiting for them to consume the data. This solution is likely the absolute fastest, but it's also incredibly difficult synchronization code to get right with about a zillion caveats. Unless you're an expert in threading (or extremely clever abuse of epoll_wait()), you probably don't want to go this route.

  • Solution B has a "master" thread, responsible for reading the file, and populating some kind of thread-safe queue with the data it reads, with one IP address (one string) per queue entry. Each of the worker threads just consumes queue entries as fast as it can, querying the remote server and then requesting another queue entry. This requires a little care to get right, but is generally a lot safer than Solution A, especially if you use somebody else's queue implementation.

  • Solution C is pretty hacktastic, but you shouldn't dismiss it out-of-hand, depending on what you're doing. This solution just involves using something like the Un*x sed command (see Get a range of lines from a file given the start and end line numbers) to slice your source file into a bunch of "chunky" source files in advance — say, twenty of them. Then you just run twenty copies of a really simple single-thread program in parallel using &, each on a different "slice" of file. Mushed together with a little shell script to automate it, this can be a "good enough" solution for a lot of needs.


Let's take a closer look at Solution B — a master thread with a thread-safe queue. I'm going to cheat and assume you can construct a working queue implementation (if not, there are StackOverflow articles on implementing a thread-safe queue using pthreads: pthread synchronized blocking queue).

In pseudocode, this solution is then something like this:

main()
{
    /* Create a queue. */
    queue = create_queue();

    /* Kick off the master thread to read the file, and give it the queue. */
    master_thread = pthread_create(master, queue);

    /* Kick off a bunch of workers with access to the queue. */
    for (i = 0; i < 20; i++) {
        worker_thread[i] = pthread_create(worker, queue);
    }

    /* Wait for everybody to finish. */
    pthread_join(master_thread);
    for (i = 0; i < 20; i++) {
        pthread_join(worker_thread[i]);
    }
}

void master(queue q)
{
    FILE *fp = fopen("ips.txt", "r");
    char buffer[BIGGER_THAN_ANY_IP];

    /* Inhale the file as fast as we can, and push each line we
       read onto the queue. */
    while (fgets(fp, buffer) != NULL) {
        char *next_ip = strdup(buffer);
        enqueue(q, next_ip);
    }

    /* Add some final messages in the queue to let the workers
       know that we're out of data.  There are *much* better ways
       of notifying them that we're "done", but in this case,
       pushing a bunch of NULLs equal to the number of threads is
       simple and probably good enough. */
    for (i = 0; i < 20; i++) {
        enqueue(q, NULL);
    }
}

void worker(queue q)
{
    char *ip;

    /* Inhale messages off the queue as fast as we can until
       we get a "NULL", which means that it's time to stop.
       The call to dequeue() *must* block if there's nothing
       in the queue; the call should only return NULL if the
       queue actually had NULL pushed into it. */
    while ((ip = dequeue(q)) != NULL) {

        /* Insert code to actually do the work here. */
        connect_and_send_and_receive_to(ip);
    }
}

There are plenty of caveats and details in a real implementation (like: how do we implement the queue, ring buffers or a linked list? what if the text isn't all IPs? what if the char buffer isn't big enough? how many threads is enough? how do we deal with file or network errors? will malloc performance become a bottleneck? what if the queue gets too big? can we do better to overlap the network I/O?).

But, caveats and details aside, the pseudocode I presented above is a good enough starting point that you likely can expand it into a working solution.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download