I am seeing some performance issues while running queries using dataframes. I have seen in my research, that long running finally tasks can be a sign that data is not disturbed optimally, but have not found a detailed process for resolving this issue.
I am starting off loading two tables as dataframes, and I am then joining those tables on one field. I have tried to add distribute by(repartition), and sort by, in order to improve the performance, but am still seeing this single long running final task. Here is a simple version of my code, note that query one and two are not actually this simple and use UDFs to calculate some values.
I have tried a few different settings for
val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")
val distributeDf1 = df1
val distributeDf2 = df2
val df3 = sqlContext
left outer join df2 on
df1.userId = df2.userId""")
You clearly have a problem with a huge right data skew. Lets take a look a the statistics you've provided:
df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088] df2 = [mean=1.0, stddev=0.0, count=18408194]
With mean around 5 and standard deviation over 2000 you get a long tail.
Since some keys are much more frequent than other after repartitioning some executors will have much more work to do than remaining ones.
Furthermoreb your description suggests that the problem can be with a single or a few keys which hash to the same partition.
So, let's first identify outliers (pseudocode):
val mean = 4.989209978967438 val sd = 2255.654165352454 val df1 = sqlContext.sql("Select * from Table1") val counts = df.groupBy("userId").count.cache val frequent = counts .where($"count" > mean + 2 * sd) // Adjust threshold based on actual dist. .alias("frequent") .join(df1, Seq("userId"))
and the rest:
val infrequent = counts .where($"count" <= mean + 2 * sd) .alias("infrequent") .join(df1, Seq("userId"))
Is it really something to be expected? If not, try to identify the source of the issue upstream.
If it is expected, you can try:
broadcasting smaller table:
val df2 = sqlContext.sql("Select * from Table2") df2.join(broadcast(df1), Seq("userId"), "rightouter")
splitting, unifying (
union) and broadcasting only frequent:
df2.join(broadcast(frequent), Seq("userId"), "rightouter") .union(df2.join(infrequent, Seq("userId"), "rightouter"))
userId with some random data
but you shouldn't: