hstoerr hstoerr - 4 months ago 23
Scala Question

Functional processing of Scala streams without OutOfMemory errors

Is it possible to apply functional programming to Scala streams such that the stream is processed sequentially, but the already processed part of the stream can be garbage collected?

For example, I define a

Stream
that contains the numbers from
start
to
end
:

def fromToStream(start: Int, end: Int) : Stream[Int] = {
if (end < start) Stream.empty
else start #:: fromToStream(start+1, end)
}


If I sum up the values in a functional style:

println(fromToStream(1,10000000).reduceLeft(_+_))


I get an
OutOfMemoryError
- perhaps since the stackframe of the call to
reduceLeft
holds a reference to the head of the stream. But if I do this in iterative style, it works:

var sum = 0
for (i <- fromToStream(1,10000000)) {
sum += i
}


Is there a way to do this in a functional style without getting an
OutOfMemory
?

UPDATE: This was a bug in scala that is fixed now. So this is more or less out of date now.

Answer

Yes, you can. The trick is to use tail recursive methods, so that the local stack frame contains the only reference to the Stream instance. Since the method is tail-recursive, the local reference to the previous Stream head will be erased once it recursively calls itself, thus enabling the GC to collect the start of the Stream as you go.

Welcome to Scala version 2.9.0.r23459-b20101108091606 (Java HotSpot(TM) Server VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import collection.immutable.Stream
import collection.immutable.Stream

scala> import annotation.tailrec
import annotation.tailrec

scala> @tailrec def last(s: Stream[Int]): Int = if (s.tail.isEmpty) s.head else last(s.tail)
last: (s: scala.collection.immutable.Stream[Int])Int

scala> last(Stream.range(0, 100000000))                                                                             
res2: Int = 99999999

Also, you must ensure that the thing you pass to the method last above has only one reference on the stack. If you store a Stream into a local variable or value, it will not be garbage collected when you call the last method, since its argument is not the only reference left to Stream. The code below runs out of memory.

scala> val s = Stream.range(0, 100000000)                                                                           
s: scala.collection.immutable.Stream[Int] = Stream(0, ?)                                                            

scala> last(s)                                                                                                      
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space                                              
        at sun.net.www.ParseUtil.encodePath(ParseUtil.java:84)                                                      
        at sun.misc.URLClassPath$JarLoader.checkResource(URLClassPath.java:674)                                     
        at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:759)                                       
        at sun.misc.URLClassPath.getResource(URLClassPath.java:169)                                                 
        at java.net.URLClassLoader$1.run(URLClassLoader.java:194)                                                   
        at java.security.AccessController.doPrivileged(Native Method)                                               
        at java.net.URLClassLoader.findClass(URLClassLoader.java:190)                                               
        at java.lang.ClassLoader.loadClass(ClassLoader.java:307)                                                    
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)                                            
        at java.lang.ClassLoader.loadClass(ClassLoader.java:248)                                                    
        at scala.tools.nsc.Interpreter$Request$$anonfun$onErr$1$1.apply(Interpreter.scala:978)                      
        at scala.tools.nsc.Interpreter$Request$$anonfun$onErr$1$1.apply(Interpreter.scala:976)                      
        at scala.util.control.Exception$Catch.apply(Exception.scala:80)
        at scala.tools.nsc.Interpreter$Request.loadAndRun(Interpreter.scala:984)                                    
        at scala.tools.nsc.Interpreter.loadAndRunReq$1(Interpreter.scala:579)                                       
        at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:599)                                             
        at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:576)
        at scala.tools.nsc.InterpreterLoop.reallyInterpret$1(InterpreterLoop.scala:472)                             
        at scala.tools.nsc.InterpreterLoop.interpretStartingWith(InterpreterLoop.scala:515)                         
        at scala.tools.nsc.InterpreterLoop.command(InterpreterLoop.scala:362)
        at scala.tools.nsc.InterpreterLoop.processLine$1(InterpreterLoop.scala:243)
        at scala.tools.nsc.InterpreterLoop.repl(InterpreterLoop.scala:249)
        at scala.tools.nsc.InterpreterLoop.main(InterpreterLoop.scala:559)
        at scala.tools.nsc.MainGenericRunner$.process(MainGenericRunner.scala:75)
        at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:31)
        at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)

To summarize:

  1. Use tail-recursive methods
  2. Annotate them as tail-recursive
  3. When you call them, ensure that their argument is the only reference to the Stream

EDIT:

Note that this also works and does not result in an out of memory error:

scala> def s = Stream.range(0, 100000000)                                                   
s: scala.collection.immutable.Stream[Int]

scala> last(s)                                                                              
res1: Int = 99999999

EDIT2:

And in the case of reduceLeft that you require, you would have to define a helper method with an accumulator argument for the result.

For reduceLeft, you need an accumulator argument, which you can set to a certain value using default arguments. A simplified example:

scala> @tailrec def rcl(s: Stream[Int], acc: Int = 0): Int = if (s.isEmpty) acc else rcl(s.tail, acc + s.head)
rcl: (s: scala.collection.immutable.Stream[Int],acc: Int)Int

scala> rcl(Stream.range(0, 10000000))
res6: Int = -2014260032