DJElbow DJElbow - 2 months ago 27
MySQL Question

Spark DataFrame InsertIntoJDBC - TableAlreadyExists Exception

Using Spark 1.4.0, I am trying to insert data from a Spark DataFrame into a MemSQL database (which should be exactly like interacting with a MySQL database) using insertIntoJdbc(). However I keep getting a Runtime TableAlreadyExists exception.

First I create the MemSQL table like this:

CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT);


Then I create a simple dataframe in Spark and try to insert into MemSQL like this:

val df = sc.parallelize(Array(123,234)).toDF.toDF("val")
//df: org.apache.spark.sql.DataFrame = [val: int]

df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false)

java.lang.RuntimeException: Table table1 already exists.

Answer

This solution applies to general JDBC connections, although the answer by @wayne is probably a better solution for memSQL specifically.

insertIntoJdbc seems to have been deprecated as of 1.4.0, and using it actually calls write.jdbc().

write() returns a DataFrameWriter object. If you want to append data to your table you will have to change the save mode of the object to "append".

Another issue with the example in the question above is the DataFrame schema didn't match the schema of the target table.

The code below gives a working example from the Spark shell. I am using spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar to start my spark-shell session.

import java.util.Properties

val prop = new Properties() 
prop.put("user", "root")
prop.put("password", "")  

val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val")   
val dfWriter = df.write.mode("append") 

dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop)