Ivan Stoyanov Ivan Stoyanov - 4 months ago 56
SQL Question

How to implement NOT IN for two DataFrames with different structure in Apache Spark

I am using Apache Spark in my Java application.
I have two

DataFrame
s:
df1
and
df2
. The
df1
contains
Row
s with
email
,
firstName
and
lastName
.
df2
contains
Row
s with
email
.

I want to create a
DataFrame
:
df3
that contains all the rows in
df1
, which email is not present in
df2
.

Is there a way to do this with Apache Spark? I tried to create
JavaRDD<String>
from
df1
and
df2
by casting them
toJavaRDD()
and filtering
df1
to containing all emails and after that using
subtract
, but I don't know how to map the new
JavaRDD
to
ds1
and get a
DataFrame
.

Basically I need all Rows that are in
df1
whose email is not in
df2
.

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer ");

DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
"WHERE product_id = '" + productId + "'");

JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0));

List<String> notBoughtEmails = customers.javaRDD()
.map(row -> row.getString(0))
.subtract(customersBoughtEmail).collect();

Answer

Spark 2.0.0+

You can use NOT IN directly.

Spark < 2.0.0

It can be expressed using outer join and filter.

val customers = sc.parallelize(Seq(
  ("john@example.com", "John", "Doe"),
  ("jane@example.com", "Jane", "Doe")
)).toDF("email", "first_name", "last_name")

val customersWhoOrderedTheProduct = sc.parallelize(Seq(
  Tuple1("jane@example.com")
)).toDF("email")

val customersWhoHaventOrderedTheProduct = customers.join(
    customersWhoOrderedTheProduct.select($"email".alias("email_")),
    $"email" === $"email_", "leftouter")
 .where($"email_".isNull).drop("email_")

customersWhoHaventOrderedTheProduct.show

// +----------------+----------+---------+
// |           email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com|      John|      Doe|
// +----------------+----------+---------+

Raw SQL equivalent:

customers.registerTempTable("customers")
customersWhoOrderedTheProduct.registerTempTable(
  "customersWhoOrderedTheProduct")

val query = """SELECT c.* FROM customers c LEFT OUTER JOIN  
                 customersWhoOrderedTheProduct o
               ON c.email = o.email
               WHERE o.email IS NULL"""

sqlContext.sql(query).show

// +----------------+----------+---------+
// |           email|first_name|last_name|
// +----------------+----------+---------+
// |john@example.com|      John|      Doe|
// +----------------+----------+---------+