user3930663 user3930663 - 1 month ago 11
Scala Question

Writing to HBase via Spark: Task not serializable

I'm trying to write some simple data in HBase (0.96.0-hadoop2) using Spark 1.0 but I keep getting getting serialization problems. Here is the relevant code:

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext
import java.util.Properties
import java.io.FileInputStream
import org.apache.hadoop.hbase.client.Put

object PutRawDataIntoHbase{
def main(args: Array[String]): Unit = {
var propFileName = "hbaseConfig.properties"
if(args.size > 0){
propFileName = args(0)
}

/** Load properties here **/
val theData = sc.textFile(prop.getProperty("hbase.input.filename"))
.map(l => l.split("\t"))
.map(a => Array("%010d".format(a(9).toInt)+ "-" + a(0) , a(1)))

val tableName = prop.getProperty("hbase.table.name")
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.rootdir", prop.getProperty("hbase.rootdir"))
hbaseConf.addResource(prop.getProperty("hbase.site.xml"))
val myTable = new HTable(hbaseConf, tableName)
theData.foreach(a=>{
var p = new Put(Bytes.toBytes(a(0)))
p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
myTable.put(p)
})
}
}


Running the code results in:

Failed to run foreach at putDataIntoHBase.scala:79
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException:org.apache.hadoop.hbase.client.HTable


Replacing the foreach with map doesn't crash but I doesn't write either.
Any help will be greatly appreciated.

Answer

The class HBaseConfiguration represents a pool of connections to HBase servers. Obviously, it can't be serialized and sent to the worker nodes. Since HTable uses this pool to communicate with the HBase servers, it can't be serialized too.

Basically, there are three ways to handle this problem:

Open a connection on each of worker nodes.

Note the use of foreachPartition method:

val tableName = prop.getProperty("hbase.table.name")
<......>
theData.foreachPartition { iter =>
  val hbaseConf = HBaseConfiguration.create()
  <... configure HBase ...>
  val myTable = new HTable(hbaseConf, tableName)
  iter.foreach { a =>
   var p = new Put(Bytes.toBytes(a(0)))
   p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
    myTable.put(p)
  }
}

Note that each of worker nodes must have access to HBase servers and must have required jars preinstalled or provided via ADD_JARS.

Also note that since the connection pool if opened for each of partitions, it would be a good idea to reduce the number of partitions roughly to the number of worker nodes (with coalesce function). It's also possible to share a single HTable instance on each of worker nodes, but it's not so trivial.

Serialize all data to a single box and write it to HBase

It's possible to write all data from an RDD with a single computer, even if it the data doesn't fit to memory. The details are explained in this answer: Spark: Best practice for retrieving big data from RDD to local machine

Of course, it would be slower than distributed writing, but it's simple, doesn't bring painful serialization issues and might be the best approach if the data size is reasonable.

Use HadoopOutputFormat

It's possible to create a custom HadoopOutputFormat for HBase or use an existing one. I'm not sure if there exists something that fits your needs, but Google should help here.

P.S. By the way, the map call doesn't crash since it doesn't get evaluated: RDDs aren't evaluated until you invoke a function with side-effects. For example, if you called theData.map(....).persist, it would crash too.