user3350744 user3350744 - 3 months ago 8
Scala Question

How to spawn an unknown amount of Futures and combine the result even if one or more failed?

I want to turn the following sequential code into concurrent code with Futures and need advice on how to structure it.

sequential:

import java.net.URL

val providers = List(
new URL("http://www.cnn.com"),
new URL("http://www.bbc.co.uk"),
new URL("http://www.othersite.com")
)

def download(urls: URL*) = urls.flatMap(url => io.Source.fromURL(url).getLines).distinct

val res = download(providers:_*)


I want to download all sources that are coming in via the varargs of the download method and combine the results into one Seq/List/Set, whatever, together. When one Future failed, let's say because the server is unreachable, it should take all others and move on and return the result nonetheless. firstCompletedOf won't work because I need the results of all, except one failed due to error. I thought about using Future.sequence like below but I can't get it to work. Here is what I tried...

def download(urls: URL*) = Future.sequence {
urls.map { url =>
Future {
io.Source.fromURL(url).getLines
}
}
}


This produces a Seq[Future[Iterator[String]]] which is not compatible with M_[Future[A_]].

A Future[Iterator[String]] is what I want. (I thought I return an Iterator because I need to reuse it later on with reset method on Iterator.)

Answer

You can use parallel collections:

import java.net.URL

val providers = List(
  new URL("http://www.cnn.com"),
  new URL("http://www.bbc.co.uk"),
  new URL("http://www.othersite.com")
)

def download(urls: URL*) = urls.par.flatMap(url => {
  Try {
    io.Source.fromURL(url).getLines
  } match {
    case Success(e) => e
    case Failure(_) => Seq()
  }
}).toSeq

val res: Seq[String] = download(providers:_*)

Or if you want the non blocking version with a Future:

def download(urls: URL*) = Future {
  urls.par.flatMap(url => {
    Try {
      io.Source.fromURL(url).getLines
    } match {
      case Success(e) => e
      case Failure(_) => Seq()
    }
  }).seq
}

val res: Future[Seq[String]] = download(providers:_*)
Comments