David David - 12 days ago 6
Python Question

pyspark using one task for mapPartitions when converting rdd to dataframe

I'm confused as to why it appears that Spark using one task for mapPartitions when converting the resulting RDD to a DataFrame. This is an issue for me because I would like to go from DataFrame --> mapPartitions --> DataFrame so that I can read in data (DataFrame), apply a non-SQL function to chunks of data (mapPartitions on RDD) and then convert back to a DataFrame so that I can using the

DataFrame.write
process. I am able to go from DataFrame --> mapPartitions and then use an RDD writer like saveAsTextFile but that is less than ideal since the
DataFrame.write
process can do things like overwrite and save data in Orc format. So I'd like to learn why this is going on, but from a pratical perspective I'm primarily concerned with being able to just go from a DataFrame --> mapParitions --> to using the DataFrame.write process.

Here is a reproducible example. The following works as expected, with 100 tasks for the
mapPartitions
work:

from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession \
.builder \
.master("yarn-client") \
.enableHiveSupport() \
.getOrCreate()

sc = spark.sparkContext

df = pd.DataFrame({'var1':range(100000),'var2': [x-1000 for x in range(100000)]})
spark_df = spark.createDataFrame(df).repartition(100)

def f(part):
return [(1,2)]

spark_df.rdd.mapPartitions(f).collect()


However if the last line is change to something like
spark_df.rdd.mapPartitions(f).toDF().show()
then there will only be one task for the
mapPartitions
work.

Some screenshots illustrating this below:
enter image description here
enter image description here

Answer

DataFrame.show() only shows the first number of rows of your dataframe, by default only the first 20. If that number is smaller than the number of rows per partition, Spark is lazy and only evaluates a single partition, which is equivalent to a single task.

You can also do collect on a dataframe, to compute and collect all partitions and see 100 tasks again.

You will still see the runJob task first as before, which is caused by the toDF call to be able to determine the resulting dataframe's schema: it needs to process a single partition to be able to determine the output types of your mapping function. After this initial stage the actual action such as collect will happen on all partitons. For instance, for me running your snippet with the last line replaced with spark_df.rdd.mapPartitions(f).toDF().collect() results in these stages:

enter image description here

Comments