Adam Levine Adam Levine - 4 months ago 42
Scala Question

Create spark dataframe from arbitrary length csv column

I'm trying to create a new dataframe from a single csv formatted column in my dataframe. I don't know the schema ahead of time so am trying to use the spark.createDataFrame method without the schema parameter (similar to method 1 in this example)

I'm trying code such as the following, but getting exceptions thrown:

var csvrdd = df.select(df("Body").cast("string")).rdd.map{x:Row => x.getAs[String](0)}.map(x => x.split(",").toSeq)
var dfWithoutSchema = spark.createDataFrame(csvrdd)


Error:

error: overloaded method value createDataFrame with alternatives:
[A <: Product](data: Seq[A])(implicit evidence$3: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame <and>
[A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$2: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
cannot be applied to (org.apache.spark.rdd.RDD[Seq[String]])
var dfWithoutSchema = spark.createDataFrame(csvrdd)

Answer Source

First, the cause for the failure can be seen clearly in the signature of createDataFrame:

def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame

The type A is bounded to be a subclass of scala.Product. Your RDD contains Array[String] which is not such a subclass. If you really want to, you can artificially wrap the array in a Tuple1 (which extends Product) and get this to work:

val csvrdd: RDD[Tuple1[Array[String]]] = df
  .select(df("Body").cast("string"))
  .rdd
  .map{ x:Row => x.getAs[String](0)}
  .map(x => Tuple1(x.split(","))) // wrapping with a Tuple1, which extends scala.Product

val dfWithoutSchema = spark.createDataFrame(csvrdd) // now this overload works

dfWithoutSchema.printSchema()
// root
// |-- _1: array (nullable = true)
// |    |-- element: string (containsNull = true)

However - this doesn't seem too useful. This will create a DataFrame with a single column of type ArrayType. This can be achieved with the much simpler split function from org.apache.spark.sql.functions:

val withArray = df.select(split(df("Body").cast("string"), ",") as "arr")

withArray.printSchema()
// root
//  |-- arr: array (nullable = true)
//  |    |-- element: string (containsNull = true)

Alternatively, if what you wish to obtain is a DataFrame with separate columns for each of the "CSV columns", you'd have to "decide" on a common schema for all records (not all records are known to have the same amount of "CSV parts"). You can do that by adding another scan of the DataFrame and calculating the maximum number of required columns, then let Spark "fill in the blanks" with nulls where the actual value contained less parts:

// first - split String into array of Strings
val withArray = df.select(split(df("Body").cast("string"), ",") as "arr")

// optional - calculate the *maximum* number of columns;
// If you know it to begin with (e.g. "CSV cannot exceed X columns") -
// you can skip this and use that known value
val maxLength: Int = withArray.select(size($"arr") as "size")
  .groupBy().max("size")
  .first().getAs[Int](0)

// Create the individual columns, with nulls where the arrays were shorted than maxLength
val columns = (0 until maxLength).map(i => $"arr".getItem(i) as s"col$i")

// select these columns
val result = withArray.select(columns: _*)

result.printSchema() // in my example, maxLength = 4
// root
//  |-- col0: string (nullable = true)
//  |-- col1: string (nullable = true)
//  |-- col2: string (nullable = true)
//  |-- col3: string (nullable = true)