hstoerr hstoerr - 1 year ago 131
Scala Question

Functional processing of Scala streams without OutOfMemory errors

Is it possible to apply functional programming to Scala streams such that the stream is processed sequentially, but the already processed part of the stream can be garbage collected?

For example, I define a

that contains the numbers from

def fromToStream(start: Int, end: Int) : Stream[Int] = {
if (end < start) Stream.empty
else start #:: fromToStream(start+1, end)

If I sum up the values in a functional style:


I get an
- perhaps since the stackframe of the call to
holds a reference to the head of the stream. But if I do this in iterative style, it works:

var sum = 0
for (i <- fromToStream(1,10000000)) {
sum += i

Is there a way to do this in a functional style without getting an

UPDATE: This was a bug in scala that is fixed now. So this is more or less out of date now.

Answer Source

Yes, you can. The trick is to use tail recursive methods, so that the local stack frame contains the only reference to the Stream instance. Since the method is tail-recursive, the local reference to the previous Stream head will be erased once it recursively calls itself, thus enabling the GC to collect the start of the Stream as you go.

Welcome to Scala version 2.9.0.r23459-b20101108091606 (Java HotSpot(TM) Server VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import collection.immutable.Stream
import collection.immutable.Stream

scala> import annotation.tailrec
import annotation.tailrec

scala> @tailrec def last(s: Stream[Int]): Int = if (s.tail.isEmpty) s.head else last(s.tail)
last: (s: scala.collection.immutable.Stream[Int])Int

scala> last(Stream.range(0, 100000000))                                                                             
res2: Int = 99999999

Also, you must ensure that the thing you pass to the method last above has only one reference on the stack. If you store a Stream into a local variable or value, it will not be garbage collected when you call the last method, since its argument is not the only reference left to Stream. The code below runs out of memory.

scala> val s = Stream.range(0, 100000000)                                                                           
s: scala.collection.immutable.Stream[Int] = Stream(0, ?)                                                            

scala> last(s)                                                                                                      
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space                                              
        at sun.net.www.ParseUtil.encodePath(ParseUtil.java:84)                                                      
        at sun.misc.URLClassPath$JarLoader.checkResource(URLClassPath.java:674)                                     
        at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:759)                                       
        at sun.misc.URLClassPath.getResource(URLClassPath.java:169)                                                 
        at java.net.URLClassLoader$1.run(URLClassLoader.java:194)                                                   
        at java.security.AccessController.doPrivileged(Native Method)                                               
        at java.net.URLClassLoader.findClass(URLClassLoader.java:190)                                               
        at java.lang.ClassLoader.loadClass(ClassLoader.java:307)                                                    
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)                                            
        at java.lang.ClassLoader.loadClass(ClassLoader.java:248)                                                    
        at scala.tools.nsc.Interpreter$Request$$anonfun$onErr$1$1.apply(Interpreter.scala:978)                      
        at scala.tools.nsc.Interpreter$Request$$anonfun$onErr$1$1.apply(Interpreter.scala:976)                      
        at scala.util.control.Exception$Catch.apply(Exception.scala:80)
        at scala.tools.nsc.Interpreter$Request.loadAndRun(Interpreter.scala:984)                                    
        at scala.tools.nsc.Interpreter.loadAndRunReq$1(Interpreter.scala:579)                                       
        at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:599)                                             
        at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:576)
        at scala.tools.nsc.InterpreterLoop.reallyInterpret$1(InterpreterLoop.scala:472)                             
        at scala.tools.nsc.InterpreterLoop.interpretStartingWith(InterpreterLoop.scala:515)                         
        at scala.tools.nsc.InterpreterLoop.command(InterpreterLoop.scala:362)
        at scala.tools.nsc.InterpreterLoop.processLine$1(InterpreterLoop.scala:243)
        at scala.tools.nsc.InterpreterLoop.repl(InterpreterLoop.scala:249)
        at scala.tools.nsc.InterpreterLoop.main(InterpreterLoop.scala:559)
        at scala.tools.nsc.MainGenericRunner$.process(MainGenericRunner.scala:75)
        at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:31)
        at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)

To summarize:

  1. Use tail-recursive methods
  2. Annotate them as tail-recursive
  3. When you call them, ensure that their argument is the only reference to the Stream


Note that this also works and does not result in an out of memory error:

scala> def s = Stream.range(0, 100000000)                                                   
s: scala.collection.immutable.Stream[Int]

scala> last(s)                                                                              
res1: Int = 99999999


And in the case of reduceLeft that you require, you would have to define a helper method with an accumulator argument for the result.

For reduceLeft, you need an accumulator argument, which you can set to a certain value using default arguments. A simplified example:

scala> @tailrec def rcl(s: Stream[Int], acc: Int = 0): Int = if (s.isEmpty) acc else rcl(s.tail, acc + s.head)
rcl: (s: scala.collection.immutable.Stream[Int],acc: Int)Int

scala> rcl(Stream.range(0, 10000000))
res6: Int = -2014260032
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download