Havnar Havnar - 1 year ago 157
Scala Question

Recursive Dataframe operations

In my spark application I would like to do operations on a dataframe in a loop and write the result to hdfs.


var df = emptyDataframe
for n = 1 to 200000{
df = df.mergeWith(somedf)

In the above example I get good results when "mergeWith" does a unionAll.

However, when in "mergeWith" I do a (simple) join, the job gets really slow (>1h with 2 executors with 4 cores each) and never finishes (job aborts itself).

In my scenario I throw in ~50 iterations with files that just contain ~1mb of text data.

Because order of merges is important in my case, I'm suspecting this is due to the DAG generation, causing the whole thing to be run at the moment I store away the data.

Right now I'm attempting to use a .persist on the merged data frame but that also seems to go rather slowly.


As the job was running i noticed (even though I did a count and .persist) the dataframe in memory didn't look like a static dataframe.
It looked like a stringed together path to all the merges it had been doing, effectively slowing down the job linearly.

Am I right to assume the
var df
is the culprit of this?

spiraling out of controle

breakdown of the issue as I see it:

dfA = empty
dfC = dfA.increment(dfB)
dfD = dfC.increment(dfN)....

When I would expect DF' A C and D are object, spark things differently and does not care if I persist or repartition or not.
to Spark it looks like this:

dfA = empty
dfC = dfA incremented with df B
dfD = ((dfA incremented with df B) incremented with dfN)....


To get rid of the persisting not working on DF's I could "break" the lineage when converting the DF to and RDD and back again.
This has a little bit of an overhead but an acceptable one (job finishes in minutes rather than hours/never)
I'll run some more tests on the persisting and formulate an answer in the form of a workaround.

This only seems to fix these issues on the surface. In reality I'm back at square one and get OOM exceptions
java.lang.OutOfMemoryError: GC overhead limit exceeded

Answer Source

So the following is what I ended up using. It's performant enough for my usecase, it works and does not need persisting.

It is very much a workaround rather than a fix.

val mutableBufferArray = ArrayBuffer[DataFrame]()

for loop {

              val interm = mergeDataFrame(df, mutableBufferArray.last)
              val intermSchema = interm.schema
              val intermRDD = interm.rdd.repartition(8)

              mutableBufferArray.append(hiveContext.createDataFrame(intermRDD, intermSchema))


This is how I wrestle tungsten into compliance. By going from a DF to an RDD and back I end up with a real object rather than a whole tungsten generated process pipe from front to back.

In my code I iterate a few times before writing out to disk (50-150 iterations seem to work best). That's where I clear out the bufferArray again to start over fresh.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download