Brady Auen Brady Auen - 3 months ago 16
Scala Question

Can I optimize this: Programmatically prepare two DataFrame's for a Union

This is under the understanding that withColumn can only take one column at a time, so if I'm wrong there I'm going to be embarrassed, but I'm worried about the memory performance of this because the DF's are likely to be very large in production. Essentially the idea is to do a union on the column arrays (Array[String]), distinct the result, and foldLeft over that set updating the accumulated DF's as I go. I'm looking for a programatic way to match the columns on the two DF's so I can perform a union afterwards.

val (newLowerCaseDF, newMasterDF): (DataFrame,DataFrame) = lowerCaseDFColumns.union(masterDFColumns).distinct
.foldLeft[(DataFrame,DataFrame)]((lowerCaseDF, masterDF))((acc: (DataFrame, DataFrame), value: String) =>
if(!lowerCaseDFColumns.contains(value)) {
(acc._1.withColumn(value,lit(None)), acc._2)
}
else if(!masterDFColumns.contains(value)) {
(acc._1, acc._2.withColumn(value, lit(None)))
}
else{
acc
}
)

Answer

Found out that it's possible to select hardcoded null columns, so my new solution is:

val masterExprs = lowerCaseDFColumns.union(lowerCaseMasterDFColumns).distinct.map(field =>
  //if the field already exists in master schema, we add the name to our select statement
  if (lowerCaseMasterDFColumns.contains(field)) {
    col(field.toLowerCase)
  }
  //else, we hardcode a null column in for that name
  else {
    lit(null).alias(field.toLowerCase)
  }
)

val inputExprs = lowerCaseDFColumns.union(lowerCaseMasterDFColumns).distinct.map(field =>
  //if the field already exists in master schema, we add the name to our select statement
  if (lowerCaseDFColumns.contains(field)) {
    col(field.toLowerCase)
  }
  //else, we hardcode a null column in for that name
  else {
    lit(null).alias(field.toLowerCase)
  }
)  

And then you're able to do a union like so:

masterDF.select(masterExprs: _*).union(lowerCaseDF.select(inputExprs: _*))