Wavyx Wavyx - 24 days ago 17
Groovy Question

How to get multiple async results within a given timeout with GPars?

I'd like to retrieve multiple "costly" results using parallel processing but within a specific timeout.

I'm using GPars Dataflow.task but it looks like I'm missing something as the process returns only when all dataflow variable are bound.

def timeout = 500
def mapResults = []
GParsPool.withPool(3) {
def taskWeb1 = Dataflow.task {
mapResults.web1 = new URL('http://web1.com').getText()
}.join(timeout, TimeUnit.MILLISECONDS)
def taskWeb2 = Dataflow.task {
mapResults.web2 = new URL('http://web2.com').getText()
}.join(timeout, TimeUnit.MILLISECONDS)
def taskWeb3 = Dataflow.task {
mapResults.web3 = new URL('http://web3.com').getText()
}.join(timeout, TimeUnit.MILLISECONDS)
}


I did see in the GPars Timeouts doc a way to use Select to get the fastest result within the timeout.
But I'm looking for a way to retrieve as much as possible results in the given time frame.

Is there a better "GPars" way to achieve this?
Or with Java 8 Future/Callable ?

Answer

Since you're interested in Java 8 based solutions too, here's a way to do it:

int timeout = 250;
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
    Map<String, CompletableFuture<String>> map = 
        Stream.of("http://google.com", "http://yahoo.com", "http://bing.com")
            .collect(
                Collectors.toMap(
                    // the key will be the URL
                    Function.identity(),
                    // the value will be the CompletableFuture text fetched from the url
                    (url) -> CompletableFuture.supplyAsync(
                        () -> readUrl(url, timeout), 
                        executorService
                    )
                )
            );
    executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);

    //print the resulting map, cutting the text at 100 chars
    map.entrySet().stream().forEach(entry -> {
        CompletableFuture<String> future = entry.getValue();
        boolean completed = future.isDone() 
                && !future.isCompletedExceptionally() 
                && !future.isCancelled(); 
        System.out.printf("url %s completed: %s, error: %s, result: %.100s\n",
            entry.getKey(),
            completed, 
            future.isCompletedExceptionally(),
            completed ? future.getNow(null) : null);
    });
} catch (InterruptedException e) {
    //rethrow
} finally {
    executorService.shutdownNow();
}

This will give you as many Futures as URLs you have, but gives you an opportunity to see if any of the tasks failed with an exception. The code could be simplified if you're not interested in these exceptions, only the contents of successful retrievals:

int timeout = 250;
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
    Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
    Stream.of("http://google.com", "http://yahoo.com", "http://bing.com")
        .forEach(url -> {
            CompletableFuture
                .supplyAsync(
                    () -> readUrl(url, timeout), 
                    executorService
                ).thenAccept(content -> map.put(url, content));
        });
    executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);

    //print the resulting map, cutting the text at 100 chars
    map.entrySet().stream().forEach(entry -> {
        System.out.printf("url %s completed, result: %.100s\n",
            entry.getKey(), entry.getValue() );
    });
} catch (InterruptedException e) {
    //rethrow
} finally {
    executorService.shutdownNow();
}

Both of the codes will wait for about 250 milliseconds (it will take only a tiny bit more because of the submissions of the tasks to the executor service) before printing the results. I found about 250 milliseconds is the threshold where some of these url-s can be fetched on my network, but not necessarily all. Feel free to adjust the timeout to experiment.

For the readUrl(url, timeout) method you could use a utility library like Apache Commons IO. The tasks submitted to the executor service will get an interrupt signal even if you don't explicitely take into account the timeout parameter. I could provide an implementation for that but I believe it's out of scope for the main issue in your question.

Comments