I have a sequence of multiple small files (~1-8KB) and I want to calculate the word count of these files. Specifically, the Sequence I have is
val totalWordCount = sc.union(
files.map(path => sc.textFile(path))
).flatMap(line => line.split(" "))
// I use a hash partitioner for better performance of reduceByKey
.reduceByKey(_ + _)
sc.textFile is not optimized for this case. Remember that optimal partition size is on the order of 100 MB, and right now, your
sc.union RDD is getting one partition per file -- <8k. Spark overhead is going to absolutely dominate anything you do in this paradigm.
You mentioned "increasing the partitions" in your question, but I think here you probably want to reduce the number of partitions. I'm not sure where
numPartitions came from, but this should be roughly total data size / 100MB. Your
.partitionBy step is performing a full shuffle, and so there will still be lots of overhead from the original too-many-partitions RDD, but it will likely perform better downstream.
Here's something else to try: a no-shuffle coalesce on the union:
val optimalNPartitions = ??? // calculate total size / 100MB here val totalWordCount = sc.union(files.map(path => sc.textFile(path))) .flatMap(line => line.split(" ")) .coalesce(optimalNPartitions, shuffle = false) // try with shuf = true as well! .map((_,1)) .reduceByKey(_ + _)
While you say you're partitioning to a new hash partitioner to make reduceByKey more efficient, this is actually wrong.
Let's look at the two models. First, the one you had:
partitionBy followed by
reduceByKey. The partition step will do a full shuffle against a new hash partitioner -- all the data needs to move across the network. When you call reduce, all the like keys are already in the same place so no shuffle needs to happen.
Second, leave out
partitionBy and just call
reduceByKey. In this model, you come into the
reduce with no partitioner, so you have to shuffle. But before you shuffle each key, you're going to reduce locally -- if you had the word "dog" 100 times on one partition, you're going to shuffle
("dog", 100) instead of
("dog", 1) 100 times. See where I'm going with this? Reduce actually requires only a partial shuffle, whose size is determined by the sparsity of keys (if you only have a few unique keys, very little is shuffled. If everything is unique, everything is shuffled).
Clearly model 2 is what we want. Get rid of that