David Portabella David Portabella - 4 months ago 10
Scala Question

scala, transform an imperative style callback function to a function style internal iterator

Suppose this API is given and we cannot change it

object ProviderAPI {

trait Receiver[T] {
def receive(entry: T)
def close()
}

def run(r: Receiver[Int]) {
new Thread() {
override def run() {
(0 to 9).foreach { i =>
r.receive(i)
Thread.sleep(100)
}
r.close()
}
}.start()
}
}


In this example,
ProviderAPI.run
takes a
Receiver
, calls
receive(i)
10 times and then closes. Typically,
ProviderAPI.run
would call
receive(i)
based on a collection which could be infinite.

This API is intended to be used in imperative style, external iterator. If our application needs to filter, map and print this input, we need to implement a Receiver which mixes all these operations:

object Main extends App {
class MyReceiver extends ProviderAPI.Receiver[Int] {
def receive(entry: Int) {
if (entry % 2 == 0) {
println("Entry#" + entry)
}
}
def close() {}
}

ProviderAPI.run(new MyReceiver())
}


Now, the question is how to use the ProviderAPI in functional style, internal iterator (without changing the implementation of ProviderAPI, which is given to us). Note that ProviderAPI could also call
receive(i)
infinite times, so it is not an option to collect everything in a list (also, we should handle each result one by one, instead of collecting all the input first, and processing it afterwards).

I am asking how to implement a
ReceiverToIterator
function, so that we can use the ProviderAPI in functional style:

object Main extends App {
val iterator = new ReceiverToIterator[Int] // how to implement this?
ProviderAPI.run(iterator)
iterator
.view
.filter(_ % 2 == 0)
.map("Entry#" + _)
.foreach(println)
}





ugly solution

I've implemented the following, and it works.
However I guess that there is a simpler way to achieve this.

import java.util.concurrent.Semaphore

class ReceiverToIterator[T] extends Receiver[T] with Iterator[T] {
var lastEntry: T = _
var waitingToReceive = new Semaphore(1)
var waitingToBeConsumed = new Semaphore(1)
var eof = false

waitingToReceive.acquire()

def receive(entry: T) {
println("ReceiverToIterator.receive(" + entry + "). START.")
waitingToBeConsumed.acquire()
lastEntry = entry
waitingToReceive.release()
println("ReceiverToIterator.receive(" + entry + "). END.")
}

def close() {
println("ReceiverToIterator.close().")
eof = true
waitingToReceive.release()
}

def hasNext = {
println("ReceiverToIterator.hasNext().START.")
waitingToReceive.acquire()
waitingToReceive.release()
println("ReceiverToIterator.hasNext().END.")
!eof
}

def next = {
println("ReceiverToIterator.next().START.")
waitingToReceive.acquire()
if (eof) { throw new NoSuchElementException }
val entryToReturn = lastEntry
waitingToBeConsumed.release()
println("ReceiverToIterator.next().END.")
entryToReturn
}
}





still ugly solution

Solution using the BlockingQueue[Option[T]] as suggested by nadavwr.

As I find this callback to iterator pattern quite often, I've implemented to be reusable like this:

trait OptionNextToIterator[T] extends Iterator[T] {
def getOptionNext: Option[T];

var answerReady: Boolean = false
var eof: Boolean = false
var element: T = _

def hasNext = {
prepareNextAnswerIfNecessary
!eof
}

def next = {
prepareNextAnswerIfNecessary
if (eof) { throw new NoSuchElementException }
val retVal = element
answerReady = false
retVal
}

def prepareNextAnswerIfNecessary {
if (answerReady) {
return
}
synchronized {
getOptionNext match {
case None => eof = true
case Some(e) => element = e
}
answerReady = true
}
}
}

trait QueueToIterator[T] extends OptionNextToIterator[T] {
val queueCapacity: Int
val queue = new ArrayBlockingQueue[Option[T]](queueCapacity)
var queueClosed = false

def queuePut(entry: T) {
if (queueClosed) { throw new IllegalStateException("The queue has already been closed."); }
queue.put(Some(entry))
}

def queueClose() {
queueClosed = true
queue.put(None)
}

def getOptionNext = queue.take
}


now, I can implement the question of this post like this:

class ReceiverToIterator[T](val queueCapacity: Int = 1) extends Receiver[T] with QueueToIterator[T] {
def receive(entry: T) { queuePut(entry) }
def close() { queueClose() }
// other things if necessary
}

val receiverToIterator = new ReceiverToIterator[Int](queueCapacity = 3);
ProviderAPI.run(receiverToIterator);

Thread.sleep(3000) // test that ProviderAPI.run is not blocked before 3 calls at this point
receiverToIterator.filter(_ % 2 == 0).map("Entry#" + _).foreach(println)


but I still think that there should be a simpler solution.
I needed to split the implementation into two traits, one that implements
getOptionNext
(instead of
next()
and
hasNext()
), and
OptionNextToIterator
which transforms this to a real iterator.
that's because
BlockingQueue
lacks a method that "Retrieves, but does not remove, the head of this queue, waiting if necessary until an element becomes available." (as the
take()
version of
pool()
but for
peek()
).
also, I am using the
synchronized
block inside
OptionNextToIterator
, which I don't like very much.




Last update

I've posted two answers to this question below:


Answer

Updated: BlockingQueue of 1 entry

What you've implemented here is essentially Java's BlockingQueue, with a queue size of 1.

Main characteristic: uber-blocking. A slow consumer will kill your producer's performance.

Update: @gzm0 mentioned that BlockingQueue doesn't cover EOF. You'll have to use BlockingQueue[Option[T]] for that.

Update: Here's a code fragment. It can be made to fit with your Receiver.
Some of it inspired by Iterator.buffered. Note that peek is a misleading name, as it may block -- and so will hasNext.

// fairness enabled -- you probably want to preserve order...
// alternatively, disable fairness and increase buffer to be 'big enough'
private val queue = new java.util.concurrent.ArrayBlockingQueue[Option[T]](1, true)

// the following block provides you with a potentially blocking peek operation
// it should `queue.take` when the previous peeked head has been invalidated
// specifically, it will `queue.take` and block when the queue is empty
private var head: Option[T] = _
private var headDefined: Boolean = false
private def invalidateHead() { headDefined = false }
private def peek: Option[T] = {
  if (!headDefined) {
    head = queue.take()
    headDefined = true
  }
  head
}

def iterator = new Iterator[T] {

  // potentially blocking; only false upon taking `None`
  def hasNext = peek.isDefined

  // peeks and invalidates head; throws NoSuchElementException as appropriate
  def next: T = {
    val opt = peek; invalidateHead()
    if (opt.isEmpty) throw new NoSuchElementException
    else opt.get
  }
}

Alternative: Iteratees

Iterator-based solutions will generally involve more blocking. Conceptually, you could use continuations on the thread doing the iteration to avoid blocking the thread, but continuations mess with Scala's for-comprehensions, so no joy down that road.

Alternatively, you could consider an iteratee-based solution. Iteratees are different than iterators in that the consumer isn't responsible for advancing the iteration -- the producer is. With iteratees, the consumer basically folds over the entries pushed by the producer over time. Folding each next entry as it becomes available can take place in a thread pool, since the thread is relinquished after each fold completes.

You won't get nice for-syntax for iteration, and the learning curve is a little challenging, but if you feel confident using a foldLeft you'll end up with a non-blocking solution that does look reasonable on the eye.

To read more about iteratees, I suggest taking a peek at PlayFramework 2.X's iteratee reference. The documentation describes their stand-alone iteratee library, which is 100% usable outside the context of Play. Scalaz 7 also has a comprehensive iteratee library.