Markus Markus - 7 months ago 67
Scala Question

How to search through struct in Spark 2?

I have the following two

DataFrames
in
Spark 2.2.0
and
Scala 2.11.8
.

df1 =

+----------+-------------------------------+
|item | other_items |
+----------+-------------------------------+
| 111 |[[444,1.0],[333,0.5],[666,0.4]]|
| 222 |[[444,1.0],[333,0.5]] |
| 333 |[] |
| 444 |[[111,2.0],[555,0.5],[777,0.2]]|
+----------+-------------------------------+


The
printScheme
gives the following output:

|-- item: string (nullable = true)
|-- other_items: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- item: string (nullable = true)
| | |-- rank: double (nullable = true)


And:

df2 =

+----------+-------------+
|itemA | itemB |
+----------+-------------+
| 111 | 333 |
| 222 | 444 |
| 333 | 555 |
| 444 | 777 |
+----------+-------------+


For each pair in
df2
, I want to find
rank
from
df1
. To do this, I should find the same pair in
df1
, so that
df1.item
is equal to
df2.itemA
and
other_items.struct.[item]
is equal to
df2.itemB
. If such pair cannot be found, the rank should be 0.

The result should be this one:

+----------+-------------+-------------+
|itemA | itemB | rank |
+----------+-------------+-------------+
| 111 | 333 | 0.5 |
| 222 | 444 | 1.0 |
| 333 | 555 | 0.0 |
| 444 | 777 | 0.2 |
+----------+-------------+-------------+


How can I do it?

Answer Source

This should do what you want. The trick is to explode other_items before the join:

df2.as("df2").join(
   df1.select($"item", explode($"other_items").as("other_items")).as("df1"),
    $"df2.itemA" === $"df1.item" and $"df2.itemB" === $"df1.other_items.item"
    , "left"
 )
 .select($"itemA", $"itemB", coalesce($"df1.other_items.rank", lit(0.0)).as("rank"))
 .show()
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download