mateusduboli mateusduboli - 1 month ago 23
Scala Question

Akka stream retry repeated result

I'm implementing a iterator to a HTTP resource, which I can recover a list of elements paged, I tried to do this with a plain

Iterator
, but it's a blocking implementation, and since I'm using
akka
it makes my dispatcher go a little crazy.

My will it's to implement the same iterator using
akka-stream
. The problem is I need bit different retry strategy.

The service returns a list of elements, identified by a
id
, and sometimes when I query for the next page, the service returns the same products on the current page.

My current algorithm is this.


var seenIds = Set.empty
var position = 0

def isProblematicPage(elements: Seq[Element]) Boolean = {
val currentIds = elements.map(_.id)
val intersection = seenIds & currentIds
val hasOnlyNewIds = intersection.isEmpty
if (hasOnlyNewIds) {
seenIds = seenIds | currentIds
}
!hasOnlyNewIds
}

def incrementPage(): Unit = {
position += 10
}

def doBackOff(attempt: Int): Unit = {
// Backoff logic
}

@tailrec
def fetchPage(attempt: Int = 0): Iterator[Element] = {
if (attempt > MaxRetries) {
incrementPage()
return Iterator.empty
}

val eventualPage = service.retrievePage(position, position + 10)

val page = Await.result(eventualPage, 5 minutes)

if (isProblematicPage(page)) {
doBackOff(attempt)
fetchPage(attempt + 1)
} else {
incrementPage()
page.iterator
}
}


I'm doing the implementation using
akka-streams
but I can't figure out how to accumulate the pages and test for repetition using the streams structure.

Any suggestions?

Answer

The Flow.scan stage was a good advice, but it lacked the feature to deal with futures, so I implemented it asynchronous version Flow.scanAsync it's now available on akka 2.4.12.

The current implementation is:

val service: WebService
val maxTries: Int
val backOff: FiniteDuration

def retry[T](zero: T, attempt: Int = 0)(f: => Future[T]): Future[T] = {
  f.recoverWith {
    case ex if attempt >= maxAttempts =>
      Future(zero)
    case ex =>
      akka.pattern.after(backOff, system.scheduler)(retry(zero, attempt + 1)(f))
  }
}

def isProblematicPage(lastPage: Seq[Element], currPage: Seq[Element]): Boolean = {
  val lastPageIds = lastPage.map(_.id).toSet
  val currPageIds = currPage.map(_.id).toSet
  val intersection = lastPageIds & currPageIds
  intersection.nonEmpty
}

def retrievePage(lastPage: Seq[Element], startIndex: Int): Future[Seq[Element]] = {
  retry(Seq.empty) {
    service.fetchPage(startIndex).map { currPage: Seq[Element] =>
      if (isProblematicPage(lastPage, currPage)) throw new ProblematicPageException(startIndex)
      else currPage
    }
  }
}


val pagesRange: Range = Range(0, maxItems, pageSize)

val scanAsyncFlow = Flow[Int].via(ScanAsync(Seq.empty)(retrievePage))

Source(pagesRange)
  .via(scanAsyncFlow)
  .mapConcat(identity)
  .runWith(Sink.seq)

Thanks Ramon for the advice :)