IAmYourFaja IAmYourFaja - 1 month ago 14
Java Question

Multithreading a massive file read

I'm still in the process of wrapping my brain around how concurrency works in Java. I understand that (if you're subscribing to the OO Java 5 concurrency model) you implement a

Task
or
Callable
with a
run()
or
call()
method (respectively), and it behooves you to parallelize as much of that implemented method as possible.

But I'm still not understanding something inherent about concurrent programming in Java:


  • How is a
    Task
    's
    run()
    method assigned the right amount of concurrent work to be performed?



As a concrete example, what if I have an I/O-bound
readMobyDick()
method that reads the entire contents of Herman Melville's Moby Dick into memory from a file on the local system. And let's just say I want this
readMobyDick()
method to be concurrent and handled by 3 threads, where:


  • Thread #1 reads the first 1/3rd of the book into memory

  • Thread #2 reads the second 1/3rd of the book into memory

  • Thread #3 reads the last 1/3rd of the book into memory



Do I need to chunk Moby Dick up into three files and pass them each to their own task, or do I I just call
readMobyDick()
from inside the implemented
run()
method and (somehow) the
Executor
knows how to break the work up amongst the threads.


I am a very visual learner, so any code examples of the right way to approach this are greatly appreciated! Thanks!

Answer

You have probably chosen by accident the absolute worst example of parallel activities!

Reading in parallel from a single mechanical disk is actually slower than reading with a single thread, because you are in fact bouncing the mechanical head to different sections of the disk as each thread gets its turn to run. This is best left as a single threaded activity.

Let's take another example, which is similar to yours but can actually offer some benefit: assume I want to search for the occurrences of a certain word in a huge list of words (this list could even have come from a disk file, but like I said, read by a single thread). Assume I can use 3 threads like in your example, each searching on 1/3rd of the huge word list and keeping a local counter of how many times the searched word appeared.

In this case you'd want to partition the list in 3 parts, pass each part to a different object whose type implements Runnable and have the search implemented in the run method.

The runtime itself has no idea how to do the partitioning or anything like that, you have to specify it yourself. There are many other partitioning strategies, each with its own strengths and weaknesses, but we can stick to the static partitioning for now.

Let's see some code:

class SearchTask implements Runnable {
     private int localCounter = 0;
     private int start; // start index of search
     private int end;
     private List<String> words;
     private String token;

     public SearchTask(int start, int end, List<String> words, String token) {
         this.start = start;
         this.end = end;
         this.words = words;
         this.token = token;
     }

     public void run() {
         for(int i = start; i < end; i++) {
              if(words.get(i).equals(token)) localCounter++;
         }
     }

     public int getCounter() { return localCounter; }
}

// meanwhile in main :)

List<String> words = new ArrayList<String>();
// populate words 
// let's assume you have 30000 words

// create tasks
SearchTask task1 = new SearchTask(0, 10000, words, "John");
SearchTask task2 = new SearchTask(10000, 20000, words, "John");
SearchTask task3 = new SearchTask(20000, 30000, words, "John");

// create threads
Thread t1 = new Thread(task1);
Thread t2 = new Thread(task1);
Thread t3 = new Thread(task1);

// start threads
t1.start();
t2.start();
t3.start();

// wait for threads to finish
t1.join();
t2.join();
t3.join();

// collect results
int counter = 0;
counter += task1.getCounter();
counter += task2.getCounter();
counter += task3.getCounter();

This should work nicely. Note that in practical cases you would build a more generic partitioning scheme. You could alternatively use an ExecutorService and implement Callable instead of Runnable if you wish to return a result.

So an alternative example using more advanced constructs:

class SearchTask implements Callable<Integer> {
     private int localCounter = 0;
     private int start; // start index of search
     private int end;
     private List<String> words;
     private String token;

     public SearchTask(int start, int end, List<String> words, String token) {
         this.start = start;
         this.end = end;
         this.words = words;
         this.token = token;
     }

     public Integer call() {
         for(int i = start; i < end; i++) {
              if(words.get(i).equals(token)) localCounter++;
         }
         return localCounter;
     }        
}

// meanwhile in main :)

List<String> words = new ArrayList<String>();
// populate words 
// let's assume you have 30000 words

// create tasks
List<Callable> tasks = new ArrayList<Callable>();
tasks.add(new SearchTask(0, 10000, words, "John"));
tasks.add(new SearchTask(10000, 20000, words, "John"));
tasks.add(new SearchTask(20000, 30000, words, "John"));

// create thread pool and start tasks
ExecutorService exec = Executors.newFixedThreadPool(3);
List<Future> results = exec.invokeAll(tasks);

// wait for tasks to finish and collect results
int counter = 0;
for(Future f: results) {
    counter += f.get();
}