dark_ruby dark_ruby - 1 month ago 5
Scala Question

Parallel Aggregate is not working on lists .length > 8

I'm writing a small exercise app that calculates number of unique letters (incl Unicode) in a

seq
of
strings
, and I'm using
aggregate
for it, as I try to run in parallel

here's my code:

class Frequency(seq: Seq[String]) {
type FreqMap = Map[Char, Int]

def calculate() = {
val freqMap: FreqMap = Map[Char, Int]()
val pattern = "(\\p{L}+)".r
val seqop: (FreqMap, String) => FreqMap = (fm, s) => {
s.toLowerCase().foldLeft(freqMap){(fm, c) =>
c match {
case pattern(char) => fm.get(char) match {
case None => fm+((char, 1))
case Some(i) => fm.updated(char, i+1)
}
case _ => fm
}
}
}

val reduce: (FreqMap, FreqMap) => FreqMap =
(m1, m2) => {
m1 ++ m2.map { case (k, v) => k -> (v + m1.getOrElse(k, 0)) }
}

seq.par.aggregate(freqMap)(seqop, reduce)

}
}


and then the code that makes use of that

object Frequency extends App {

val text = List("abc", "abc", "abc", "abc", "abc", "abc", "abc", "abc", "abc");

def frequency(seq: Seq[String]):Map[Char, Int] = {
new Frequency(seq).calculate()
}

Console println frequency(seq=text)
}


though I supplied "abc" 9 times, the result is
Map(a -> 8, b -> 8, c -> 8)
, as it is for any number of
"abc"
's > 8

I've looked at this, and it seems like I'm using aggregate correctly

Any suggestions to make it work?

Answer

You're discarding already collected results (the first fm) in your seqop. You need to add these to the new results you're computing, e.g. like this:

def calculate() = {
    val freqMap: FreqMap = Map[Char, Int]()
    val pattern = "(\\p{L}+)".r
    val reduce: (FreqMap, FreqMap) => FreqMap =
      (m1, m2) => {
        m1 ++ m2.map { case (k, v) => k -> (v + m1.getOrElse(k, 0)) }
      }
    val seqop: (FreqMap, String) => FreqMap = (fm, s) => {
      val res = s.toLowerCase().foldLeft(freqMap){(fm, c) =>
        c match {
          case pattern(char) => fm.get(char) match {
            case None => fm+((char, 1))
            case Some(i) => fm.updated(char, i+1)
          }
          case _ => fm
        }
      }

      // I'm reusing your existing combinator function here:
      reduce(res,fm)
    }

    seq.par.aggregate(freqMap)(seqop, reduce)
}

Depending on how the parallel collections divide the work you discard some of it. In your case (9x "abc") it divides the thing in 8 parallel seqop operations which means you discard exactly one result set. This varies depending on numbers, if you run in with say 17x "abc" it runs in 13 parallel operations, discarding 4 result sets (on my machine anyway - I'm not familiar with the underlying code and how it divides the work, this probably depends on the used ExecutionContext/Threadpool and subsequently number of CPUs/cores and so on).

Generally parallel collections are a drop in replacement for sequential collections, meaning if you drop .par you should still get the same result, albeit usually slower. If you do this with your original code you get a result of 1, which tells you that it's not a parallelization problem. This is a good way to test if you're doing to right thing when using these.

And last but not least: This was harder to spot than usual for me because you use the same variable name twice and subsequently shadow fm. Not doing that would make the code more readable and mistakes such as this easier to spot.

Comments