jaycode jaycode - 4 months ago 28
Python Question

Spark 2.0 Possible Bug on DataFrame Initialization

There is possible bug that is produced by the following code :

_struct = [
types.StructField('string_field', types.StringType(), True),
types.StructField('long_field', types.LongType(), True),
types.StructField('double_field', types.DoubleType(), True)
]
_rdd = sc.parallelize([Row(string_field='1', long_field=1, double_field=1.1)])
_schema = types.StructType(_struct)
_df = sqlContext.createDataFrame(_rdd, schema=_schema)
_df.take(1)


The expected output is an RDD with 1 row should be created.

But with the current behavior I receive the following error:

DoubleType can not accept object '1' in type <type 'str'>


PS: I'm using spark 2.0 compile on Scala 2.10

Edit

Thanks to the answerer's suggestion, I can properly understand this now. To simplify, make sure that the struct is sorted. The following code explains this:

# This doesn't work:
_struct = [
SparkTypes.StructField('string_field', SparkTypes.StringType(), True),
SparkTypes.StructField('long_field', SparkTypes.LongType(), True),
SparkTypes.StructField('double_field', SparkTypes.DoubleType(), True)
]
_rdd = sc.parallelize([Row(string_field='1', long_field=1, double_field=1.1)])

# But this will work, since schema is sorted:
_struct = sorted([
SparkTypes.StructField('string_field', SparkTypes.StringType(), True),
SparkTypes.StructField('long_field', SparkTypes.LongType(), True),
SparkTypes.StructField('double_field', SparkTypes.DoubleType(), True)
], key=lambda x: x.name)
params = {'string_field':'1', 'long_field':1, 'double_field':1.1}
_rdd = sc.parallelize([Row(**params)])


_schema = SparkTypes.StructType(_struct)

_df = sqlContext.createDataFrame(_rdd, schema=_schema)
_df.take(1)

_schema = SparkTypes.StructType(_struct)

_df = sqlContext.createDataFrame(_rdd, schema=_schema)
_df.take(1)

Answer

This looks like a change of behavior between 1.x and 2.x but I doubt it is a bug. In particular when you create Row object with kwargs (named arguments) the fields are sorted by names. Let's illustrate that with a simple example:

Row(string_field='1', long_field=1, double_field=1.1)
## Row(double_field=1.1, long_field=1, string_field='1'

As you can see order of fields change and is no longer reflected in the schema.

Prior to 2.0.0 Spark verifies types only if data argument for createDataFrame is a local data structure. So following code:

sqlContext.createDataFrame(
    data=[Row(string_field='1', long_field=1, double_field=1.1)], 
    schema=_schema
)

would fail in 1.6 as well

Spark 2.0.0 introduced verification for RDDs and provides consistent behavior between local and distributed inputs.

Comments