alessandrocaprarelli alessandrocaprarelli - 1 year ago 77
Java Question

Java concurrency exercise. Asynchronous download

I'm doing an exercise about Java concurrency using wait, notify to study for an exam.
The exam will be written, so the code does have to be perfect since we can't try to compile and check errors.

This is the text of the exercise:

ex
General idea:


  • when the downloader is instanced the queue and the hashmap are created and passed to all the threads. (shared data)

  • the download method add the url to the queue and call notifyAll to wake up the Downloader Threads.

  • the getData method waits until there are data in the hashmap for the provided url. When data are available it returns to the caller.

  • the downloader thread runs an infinity loop. It waits until an url is present in the queue. When it receives an url it downloads it and puts the bytes in the hashmap calling notifyAll to wake up a possible user waiting in getData method.



This is the code that I produced:

public class Downloader{

private Queue downloadQueue;
private HashMap urlData;
private final static THREADS_NUMBER;

public Downloader(){
this.downloadQueue = new Queue();
this.urlData = new HashMap();
for(int i = 0; i < THREADS_NUMBER; i++){
new DownTh(this.downloadQueue, this.urlData).start();
}
}

void syncronized download(String URL){
downloadQueue.add(url);
notifyAll();
return;
}

byte[] syncronized getData(String URL){
while(urlData.get(URL) == null ){
wait()
}

return urlData.get(URL);
}
}

public class DownTh extend Thread{

private Queue downloadQueue;
private HashMap urlData;

public DownTh(Queue downloadQueue, HashMap urlData){
this.downloadQueue = downloadQueue
this.urlData = urlData;
}

public void run(){
while(true){
syncronized{
while(queue.isEmpty()){
wait()
}

String url = queue.remove();
urlData.add(url, Util.download(url))

notifyAll()
}
}
}
}


Can you help me and tell me if the logic is right?

Answer Source

Let's assume for a second that all those great classes in Java that handle synchronization do not exist, because this is a synthetic task, and all you got to handle is sychronized, wait and notify.

The first question to answer in simple words is: "Who is going to wait on what?"

  • The download thread is going to wait for an URL to download.
  • The caller is going to wait for the result of that download thread.

What does this mean in detail? We need at least one synchronization element between the caller and the download thread (your urlData), also there should be one data object handling the download data itself for convenience, and to check whether or not the download has yet been completed.

So the detailed steps that will happen are:

  1. Caller requests new download.
    create: DownloadResult
    write: urlData(url -> DownloadResult)
    wake up 1 thread on urlData.

  2. Thread X must find data to download and process it or/then fall asleep again.
    read: urlData (find first unprocessed DownloadResult, otherwise wait on urlData)
    write: DownloadResult (acquire it)
    write: DownloadResult (download result)
    notify: anyone waiting on DownloadResult
    repeat

  3. Caller must be able to asynchronously check/wait for download result.
    read: urlData
    read: DownloadResult (wait on DownloadResult if required)

As there are reads and writes from different threads on those objects, synchronization is required when accessing the objects urlData or DownloadResult.

Also there will be a wait/notify association:

  • caller -> urlData -> DownTh
  • DownTh -> DownloadResult -> caller

After careful analysis the following code would fulfill the requirements:

public class DownloadResult {

  protected final URL url; // this is for convenience
  protected boolean inProgress;
  protected byte[] result;

  public DownloadResult(final URL url) {
    this.url = url;
    this.inProgress = false;
  }

  /* Try to lock this against tother threads if not already acquired. */
  public synchronized boolean acquire() {
    if (this.inProgress == false) {
      this.inProgress = true;
      return true;
    } else {
      return false;
    }
  }

  public void download() {
    final byte[] downloadedBytes = Util.download(this.url); // note how this is done outside the synchronized block to avoid unnecessarily long blockings
    synchronized (this) {
      this.result = downloadedBytes;
      this.notifyAll(); // wake-up ALL callers
    }
  }

  public synchronized byte[] getResult() throws InterruptedException {
    while (this.result == null) {
      this.wait();
    }
    return this.result;
  }

}

protected class DownTh extends Thread {

  protected final Map<URL, DownloadResult> urlData;

  public DownTh(final Map<URL, DownloadResult> urlData) {
    this.urlData = urlData;
    this.setDaemon(true); // this allows the JVM to shut down despite DownTh threads still running
  }

  protected DownloadResult getTask() {
    for (final DownloadResult downloadResult : urlData.values()) {
      if (downloadResult.acquire()) {
        return downloadResult;
      }
    }
    return null;
  }

  @Override
  public void run() {
    DownloadResult downloadResult;
    try {
      while (true) {
        synchronized (urlData) {
          while ((downloadResult = this.getTask()) == null) {
            urlData.wait();
          }
        }
        downloadResult.download();
      }
    } catch (InterruptedException ex) {
      // can be ignored
    } catch (Error e) {
      // log here
    }
  }
}

public class Downloader {

  protected final Map<URL, DownloadResult> urlData = new HashMap<>();

  // insert constructor that creates the threads here

  public DownloadResult download(final URL url) {
    final DownloadResult result = new DownloadResult(url);
    synchronized (urlData) {
      urlData.putIfAbsent(url, result);
      urlData.notify(); // only one thread needs to wake up
    }
    return result;
  }

  public byte[] getData(final URL url) throws InterruptedException {
    DownloadResult result;
    synchronized (urlData) {
      result = urlData.get(url);
    }
    if (result != null) {
      return result.getResult();
    } else {
      throw new IllegalStateException("URL " + url + " not requested.");
    }
  }
}

In real Java things would be done differently, by using Concurrent classes and/or Atomic... classes, so this is just for educational purposes. For further reading see "Callable Future".

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download