Based on https://github.com/akka/akka/blob/aa8c253d141a96b65ac19f781106fa82d457c2c2/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala#L197-L211
I don't get why the parameter of
def fromIterator(iterator: => Iterator[T])
Source.fromIterator( () => myIterator)
As per the docs:
The iterator will be created anew for each materialization, which is the reason the method takes a function rather than an iterator directly.
Stream stages are supposed to be re-usable so you can materialize them more than one. A given iterator, however, can (often) be consumed one time only. If
fromIterator created a Source that referred to an existing iterator (whether passed by name or reference) a second attempt to materialize it could fail because the underlying iterator would be exhausted.
To get around this, the source needs to be able to instantiate a new iterator, so
fromIterator allows you to supply the necessary logic to do this as a supplier function.
Here's an example of something we don't want to happen:
implicit val system = akka.actor.ActorSystem.create("test") implicit val mat = akka.stream.ActorMaterializer(system) val iter = Iterator.range(0, 2) // pretend we pass the iterator directly... val src = Source.fromIterator(() => iter) Await.result(src.runForEach(println), 2.seconds) // 0 // 1 // res0: akka.Done = Done Await.result(src.runForEach(println), 2.seconds) // res1: akka.Done = Done // No results???
That's bad because the Source
src is not re-usable since it doesn't give the same output on subsequent runs. However if we create the iterator lazily it works:
val iterFunc = () => Iterator.range(0, 2) val src = Source.fromIterator(iterFunc) Await.result(src.runForEach(println), 2.seconds) // 0 // 1 // res0: akka.Done = Done Await.result(src.runForEach(println), 2.seconds) // 0 // 1 // res1: akka.Done = Done