David Portabella David Portabella - 1 year ago 90
Scala Question

scala, transform an imperative style callback function to a function style internal iterator

Suppose this API is given and we cannot change it

object ProviderAPI {

trait Receiver[T] {
def receive(entry: T)
def close()

def run(r: Receiver[Int]) {
new Thread() {
override def run() {
(0 to 9).foreach { i =>

In this example,
takes a
, calls
10 times and then closes. Typically,
would call
based on a collection which could be infinite.

This API is intended to be used in imperative style, external iterator. If our application needs to filter, map and print this input, we need to implement a Receiver which mixes all these operations:

object Main extends App {
class MyReceiver extends ProviderAPI.Receiver[Int] {
def receive(entry: Int) {
if (entry % 2 == 0) {
println("Entry#" + entry)
def close() {}

ProviderAPI.run(new MyReceiver())

Now, the question is how to use the ProviderAPI in functional style, internal iterator (without changing the implementation of ProviderAPI, which is given to us). Note that ProviderAPI could also call
infinite times, so it is not an option to collect everything in a list (also, we should handle each result one by one, instead of collecting all the input first, and processing it afterwards).

I am asking how to implement a
function, so that we can use the ProviderAPI in functional style:

object Main extends App {
val iterator = new ReceiverToIterator[Int] // how to implement this?
.filter(_ % 2 == 0)
.map("Entry#" + _)

ugly solution

I've implemented the following, and it works.
However I guess that there is a simpler way to achieve this.

import java.util.concurrent.Semaphore

class ReceiverToIterator[T] extends Receiver[T] with Iterator[T] {
var lastEntry: T = _
var waitingToReceive = new Semaphore(1)
var waitingToBeConsumed = new Semaphore(1)
var eof = false


def receive(entry: T) {
println("ReceiverToIterator.receive(" + entry + "). START.")
lastEntry = entry
println("ReceiverToIterator.receive(" + entry + "). END.")

def close() {
eof = true

def hasNext = {

def next = {
if (eof) { throw new NoSuchElementException }
val entryToReturn = lastEntry

still ugly solution

Solution using the BlockingQueue[Option[T]] as suggested by nadavwr.

As I find this callback to iterator pattern quite often, I've implemented to be reusable like this:

trait OptionNextToIterator[T] extends Iterator[T] {
def getOptionNext: Option[T];

var answerReady: Boolean = false
var eof: Boolean = false
var element: T = _

def hasNext = {

def next = {
if (eof) { throw new NoSuchElementException }
val retVal = element
answerReady = false

def prepareNextAnswerIfNecessary {
if (answerReady) {
synchronized {
getOptionNext match {
case None => eof = true
case Some(e) => element = e
answerReady = true

trait QueueToIterator[T] extends OptionNextToIterator[T] {
val queueCapacity: Int
val queue = new ArrayBlockingQueue[Option[T]](queueCapacity)
var queueClosed = false

def queuePut(entry: T) {
if (queueClosed) { throw new IllegalStateException("The queue has already been closed."); }

def queueClose() {
queueClosed = true

def getOptionNext = queue.take

now, I can implement the question of this post like this:

class ReceiverToIterator[T](val queueCapacity: Int = 1) extends Receiver[T] with QueueToIterator[T] {
def receive(entry: T) { queuePut(entry) }
def close() { queueClose() }
// other things if necessary

val receiverToIterator = new ReceiverToIterator[Int](queueCapacity = 3);

Thread.sleep(3000) // test that ProviderAPI.run is not blocked before 3 calls at this point
receiverToIterator.filter(_ % 2 == 0).map("Entry#" + _).foreach(println)

but I still think that there should be a simpler solution.
I needed to split the implementation into two traits, one that implements
(instead of
), and
which transforms this to a real iterator.
that's because
lacks a method that "Retrieves, but does not remove, the head of this queue, waiting if necessary until an element becomes available." (as the
version of
but for
also, I am using the
block inside
, which I don't like very much.

Last update

I've posted two answers to this question below:

Answer Source

Updated: BlockingQueue of 1 entry

What you've implemented here is essentially Java's BlockingQueue, with a queue size of 1.

Main characteristic: uber-blocking. A slow consumer will kill your producer's performance.

Update: @gzm0 mentioned that BlockingQueue doesn't cover EOF. You'll have to use BlockingQueue[Option[T]] for that.

Update: Here's a code fragment. It can be made to fit with your Receiver.
Some of it inspired by Iterator.buffered. Note that peek is a misleading name, as it may block -- and so will hasNext.

// fairness enabled -- you probably want to preserve order...
// alternatively, disable fairness and increase buffer to be 'big enough'
private val queue = new java.util.concurrent.ArrayBlockingQueue[Option[T]](1, true)

// the following block provides you with a potentially blocking peek operation
// it should `queue.take` when the previous peeked head has been invalidated
// specifically, it will `queue.take` and block when the queue is empty
private var head: Option[T] = _
private var headDefined: Boolean = false
private def invalidateHead() { headDefined = false }
private def peek: Option[T] = {
  if (!headDefined) {
    head = queue.take()
    headDefined = true

def iterator = new Iterator[T] {

  // potentially blocking; only false upon taking `None`
  def hasNext = peek.isDefined

  // peeks and invalidates head; throws NoSuchElementException as appropriate
  def next: T = {
    val opt = peek; invalidateHead()
    if (opt.isEmpty) throw new NoSuchElementException
    else opt.get

Alternative: Iteratees

Iterator-based solutions will generally involve more blocking. Conceptually, you could use continuations on the thread doing the iteration to avoid blocking the thread, but continuations mess with Scala's for-comprehensions, so no joy down that road.

Alternatively, you could consider an iteratee-based solution. Iteratees are different than iterators in that the consumer isn't responsible for advancing the iteration -- the producer is. With iteratees, the consumer basically folds over the entries pushed by the producer over time. Folding each next entry as it becomes available can take place in a thread pool, since the thread is relinquished after each fold completes.

You won't get nice for-syntax for iteration, and the learning curve is a little challenging, but if you feel confident using a foldLeft you'll end up with a non-blocking solution that does look reasonable on the eye.

To read more about iteratees, I suggest taking a peek at PlayFramework 2.X's iteratee reference. The documentation describes their stand-alone iteratee library, which is 100% usable outside the context of Play. Scalaz 7 also has a comprehensive iteratee library.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download