Alex Alex - 3 years ago 163
Scala Question

Writing DataFrame to MemSQL Table in Spark

Im trying to load a .parquet file into a MemSQL Database with Spark and MemSQL Connector.

package com.memsql.spark

import com.memsql.spark.context._

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

import com.memsql.spark.connector._
import com.mysql.jdbc._

object readParquet {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("ReadParquet")
val sc = new SparkContext(conf)
sc.addJar("/data/applications/spark-1.5.1-bin-hadoop2.6/lib/mysql-connector-java-5.1.37-bin.jar")
sc.addJar("/data/applications/spark-1.5.1-bin-hadoop2.6/lib/memsql-connector_2.10-1.1.0.jar")
Class.forName("com.mysql.jdbc.Driver")

val host = "xxxx"
val port = 3306
val dbName = "WP1"
val user = "root"
val password = ""
val tableName = "rt_acc"

val memsqlContext = new com.memsql.spark.context.MemSQLContext(sc, host, port, user, password)

val rt_acc = memsqlContext.read.parquet("tachyon://localhost:19998/rt_acc.parquet")
val func_rt_acc = new com.memsql.spark.connector.DataFrameFunctions(rt_acc)
func_rt_acc.saveToMemSQL(dbName, tableName, host, port, user, password)
}
}


I'm fairly certain that Tachyon is not causing the problem, as the same exceptions occur if loaded from disk and i can use sql-queries on the dataframe.
I've seen people suggest df.saveToMemSQL(..) however it seems this method is in DataFrameFunctions now.

Also the table doesnt exist yet but saveToMemSQL should do CREATE TABLE as documentation and source code tell me.

Edit: Ok i guess i misread something. saveToMemSQL doesn't create the table. Thanks.

Answer Source

Try using createMemSQLTableAs instead of saveToMemSQL.
saveToMemSQL loads a dataframe into an existing table, where as createMemSQLTableAs creates the table and then loads it. It also returns a handy dataframe wrapping that MemSQL table :).

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download