David Portabella David Portabella - 1 year ago 55
Scala Question

scala, transform a callback pattern to a functional style internal iterator

Suppose this API is given and we cannot change it:

object ProviderAPI {

trait Receiver[T] {
def receive(entry: T)
def close()

def run(r: Receiver[Int]) {
new Thread() {
override def run() {
(0 to 9).foreach { i =>

In this example,
takes a
, calls
10 times and then closes. Typically,
would call
based on a collection which could be infinite.

This API is intended to be used in imperative style, like an external iterator. If our application needs to filter, map and print this input, we need to implement a Receiver which mixes all these operations:

object Main extends App {
class MyReceiver extends ProviderAPI.Receiver[Int] {
def receive(entry: Int) {
if (entry % 2 == 0) {
println("Entry#" + entry)
def close() {}

ProviderAPI.run(new MyReceiver())

Now, the question is how to use the ProviderAPI in functional style, internal iterator (without changing the implementation of ProviderAPI, which is given to us). Note that ProviderAPI could also call
infinite times, so it is not an option to collect everything in a list (also, we should handle each result one by one, instead of collecting all the input first, and processing it afterwards).

I am asking how to implement such a
, so that we can use the ProviderAPI in functional style:

object Main extends App {
val iterator = new ReceiverToIterator[Int] // how to implement this?
.filter(_ % 2 == 0)
.map("Entry#" + _)


Here are four solutions:

Answer Source

Updated: BlockingQueue of 1 entry

What you've implemented here is essentially Java's BlockingQueue, with a queue size of 1.

Main characteristic: uber-blocking. A slow consumer will kill your producer's performance.

Update: @gzm0 mentioned that BlockingQueue doesn't cover EOF. You'll have to use BlockingQueue[Option[T]] for that.

Update: Here's a code fragment. It can be made to fit with your Receiver.
Some of it inspired by Iterator.buffered. Note that peek is a misleading name, as it may block -- and so will hasNext.

// fairness enabled -- you probably want to preserve order...
// alternatively, disable fairness and increase buffer to be 'big enough'
private val queue = new java.util.concurrent.ArrayBlockingQueue[Option[T]](1, true)

// the following block provides you with a potentially blocking peek operation
// it should `queue.take` when the previous peeked head has been invalidated
// specifically, it will `queue.take` and block when the queue is empty
private var head: Option[T] = _
private var headDefined: Boolean = false
private def invalidateHead() { headDefined = false }
private def peek: Option[T] = {
  if (!headDefined) {
    head = queue.take()
    headDefined = true

def iterator = new Iterator[T] {

  // potentially blocking; only false upon taking `None`
  def hasNext = peek.isDefined

  // peeks and invalidates head; throws NoSuchElementException as appropriate
  def next: T = {
    val opt = peek; invalidateHead()
    if (opt.isEmpty) throw new NoSuchElementException
    else opt.get

Alternative: Iteratees

Iterator-based solutions will generally involve more blocking. Conceptually, you could use continuations on the thread doing the iteration to avoid blocking the thread, but continuations mess with Scala's for-comprehensions, so no joy down that road.

Alternatively, you could consider an iteratee-based solution. Iteratees are different than iterators in that the consumer isn't responsible for advancing the iteration -- the producer is. With iteratees, the consumer basically folds over the entries pushed by the producer over time. Folding each next entry as it becomes available can take place in a thread pool, since the thread is relinquished after each fold completes.

You won't get nice for-syntax for iteration, and the learning curve is a little challenging, but if you feel confident using a foldLeft you'll end up with a non-blocking solution that does look reasonable on the eye.

To read more about iteratees, I suggest taking a peek at PlayFramework 2.X's iteratee reference. The documentation describes their stand-alone iteratee library, which is 100% usable outside the context of Play. Scalaz 7 also has a comprehensive iteratee library.