matchew matchew - 1 year ago 197
Scala Question

Spark join array

I am brand new to spark (hours) and additionally rather inexperienced with Scala. However, I have long standing desire to become more familiar with both.

I have a rather trivial taks. I have two dataframes that I am importing from two JSON files. One with an

uuid,text,tag_ids
and the other with the tags
id,term
I would like to produce a new json file that I can import into
solr
that contains the uuid,text,tag_ids,tag_terms.

val text = spark.sqlContext.jsonFile("/tmp/text.js")
val tags = spark.sqlContext.jsonFile("/tmp/tags.js")



text.printSchema()

root
| -- uuid: string (nullable = true)
| -- tag_ids: array (nullable = true)
| | -- element: string (contiansNull = true)
| -- text: string (nullable = true)

tags.printSchema()
root
| -- id: string (nullable = true)
| -- term: string (nullable = true)


#desired output
+--------------------+------+---------+------------+
| uuid| text | tag_ids | tag_terms|
+--------------------+------+---------+------------+
|cf5c1f4c-96e6-4ca...| foo | [1,2]| [tag1,tag2]|
|c9834e2e-0f04-486...| bar | [2,3]| [tag2,tag3]|
+--------------------+--------------+--------------+


It is difficult to show all I have been trying. Essentially
.join()
is having issues with tag_ids being an array. I can
explode()
tag_ids
and join on
tag_terms
but reassembling it into a new df to export is still beyond my level.

Answer Source

Solution using explode:

val result = text
  .withColumn("tag_id", explode($"tag_ids"))
  .join(tags,  $"tag_id" === $"id")
  .groupBy("uuid", "tag_ids")
  .agg(first("text") as "text", collect_list("term") as "tag_terms")
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download