navige navige - 1 month ago 8
Scala Question

Spark Row to JSON

I would like to create a JSON from a Spark v.1.6 (using scala) dataframe. I know that there is the simple solution of doing

df.toJSON
.

However, my problem looks a bit different. Consider for instance a dataframe with the following columns:

| A | B | C1 |  C2 | C3 |
-------------------------------------------
| 1 | test | ab | 22 | TRUE |
| 2 | mytest | gh | 17 | FALSE |


I would like to have at the end a dataframe with

| A | B | C |
----------------------------------------------------------------
| 1 | test | { "c1" : "ab", "c2" : 22, "c3" : TRUE } |
| 2 | mytest | { "c1" : "gh", "c2" : 17, "c3" : FALSE } |


where C is a JSON containing
C1
,
C2
,
C3
. Unfortunately, I at compile time I do not know what the dataframe looks like (except the columns
A
and
B
that are always "fixed").

As for the reason why I need this: I am using Protobuf for sending around the results. Unfortunately, my dataframe sometimes has more columns than expected and I would still send those via Protobuf, but I do not want to specify all columns in the definition.

How can I achieve this?

Answer

Here, no JSON parser, and it adapts to your schema:

import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}

df.select(
  col(df.columns(0)),
  col(df.columns(1)),
  concat(
    lit("{"), 
    concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => {
      val c = dt._1;
      val t = dt._2;
      concat(
        lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "")  ),
        col(c),
        lit(if(t=="StringType") "\""; else "") 
      )
    }):_*), 
    lit("}")
  ) as "C"
).collect()