mateusduboli mateusduboli - 1 year ago 234
Scala Question

Akka stream retry repeated result

I'm implementing a iterator to a HTTP resource, which I can recover a list of elements paged, I tried to do this with a plain

, but it's a blocking implementation, and since I'm using
it makes my dispatcher go a little crazy.

My will it's to implement the same iterator using
. The problem is I need bit different retry strategy.

The service returns a list of elements, identified by a
, and sometimes when I query for the next page, the service returns the same products on the current page.

My current algorithm is this.

var seenIds = Set.empty
var position = 0

def isProblematicPage(elements: Seq[Element]) Boolean = {
val currentIds =
val intersection = seenIds & currentIds
val hasOnlyNewIds = intersection.isEmpty
if (hasOnlyNewIds) {
seenIds = seenIds | currentIds

def incrementPage(): Unit = {
position += 10

def doBackOff(attempt: Int): Unit = {
// Backoff logic

def fetchPage(attempt: Int = 0): Iterator[Element] = {
if (attempt > MaxRetries) {
return Iterator.empty

val eventualPage = service.retrievePage(position, position + 10)

val page = Await.result(eventualPage, 5 minutes)

if (isProblematicPage(page)) {
fetchPage(attempt + 1)
} else {

I'm doing the implementation using
but I can't figure out how to accumulate the pages and test for repetition using the streams structure.

Any suggestions?

Answer Source

The Flow.scan stage was a good advice, but it lacked the feature to deal with futures, so I implemented it asynchronous version Flow.scanAsync it's now available on akka 2.4.12.

The current implementation is:

val service: WebService
val maxTries: Int
val backOff: FiniteDuration

def retry[T](zero: T, attempt: Int = 0)(f: => Future[T]): Future[T] = {
  f.recoverWith {
    case ex if attempt >= maxAttempts =>
    case ex =>
      akka.pattern.after(backOff, system.scheduler)(retry(zero, attempt + 1)(f))

def isProblematicPage(lastPage: Seq[Element], currPage: Seq[Element]): Boolean = {
  val lastPageIds =
  val currPageIds =
  val intersection = lastPageIds & currPageIds

def retrievePage(lastPage: Seq[Element], startIndex: Int): Future[Seq[Element]] = {
  retry(Seq.empty) {
    service.fetchPage(startIndex).map { currPage: Seq[Element] =>
      if (isProblematicPage(lastPage, currPage)) throw new ProblematicPageException(startIndex)
      else currPage

val pagesRange: Range = Range(0, maxItems, pageSize)

val scanAsyncFlow = Flow[Int].via(ScanAsync(Seq.empty)(retrievePage))


Thanks Ramon for the advice :)

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download