user1902291 user1902291 - 28 days ago 13
MySQL Question

Loading large data from MySQL into Spark

Looking for Spark understanding...

I am loading large amounts of data from MySQL into Spark, and it keeps dying :-(

org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)


Here is my code

val query =
s"""
(
select
mod(act.AccountID, ${parts}) part,
p.Value name, event.EventTime eventTime, act.AccountID accountID, act.UserGoal goalID,event.ActivityID activityID, id.CountryID countryID, arr.ConsumerID consumerID
from DimIdentity as id
join FactArrival as arr on arr.IdentityID=id.IdentityID
join FactActivityEvent as event on event.ArrivalID=arr.ArrivalID
join DimAccount as act on act.AccountID=event.AccountID
join DimAccountRoleTypeMatch as role on role.AccountID=act.AccountID
join DimDateTime as d on event.DateTimeID=d.DateTimeID
join DimProperty as p on p.PropertyID=event.EventTypeID
where
id.Botness=0 and
d.DayOfYear>=${from} and d.DayOfYear<${to} and d.Year=${year} and
(role.AccountRoleTypeID=1 or role.AccountRoleTypeID=2)
) a
""".stripMargin

val events = sqlContext.read.format("jdbc").
option("url", sqlURL).
option("driver", "com.mysql.jdbc.Driver").
option("useUnicode", "true").
option("zeroDateTimeBehavior", "round").
option("continueBatchOnError", "true").
option("useSSL", "false").
option("dbtable", query).
option("user", sqlUser).
option("password", sqlPassword).
option("partitionColumn", "part").
option("lowerBound", "0").
option("upperBound", s"${parts - 1}").
option("numPartitions", s"${parts}").
load().as[Activity].toDF


Note that I am using partitionColumn, lowerBound, upperBound, numPartitions as recommended in other answers

I tried setting partitions from 4 to 512, but it always dies. Reading the same amount of data from file or Mongo has no problem. Is this an issue with the MySQL connector? Is there a solution?

Note that I found one answer that suggests I avoid Spark, and read the query into a file on HDFS, then load the file

Multiple Partitions in Spark RDD

Is this really the best way?

Answer

Here is the answer I got to...

For me, the answer is to avoid the mysql-connection for Spark :-( I found it too difficult to avoid the crashing caused by partitioning. Mysql-connection requires hand tuning of the partitions, and does not yield any increase in speed. Much easier to write non-Spark code that reads the data into large text files, and the call Spark on the text file. Spark is really good with most data sources, but not mysql... at least not yet