sau sau - 2 months ago 31
Scala Question

hBaseRDD.collect() giving an error

I am wotking with spark and hbase. I used HBaseTest.scala.

rdd.count() is giving accurate result. But when I try to do rdd.collect() following error came:

java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:210)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Not able to figure out the issue. I want to print some rows of hbase table.

Answer

I had the same problem and found the solution here.

Here is a code snippet

type HBaseRow = java.util.NavigableMap[Array[Byte], java.util.NavigableMap[Array[Byte], java.util.NavigableMap[java.lang.Long, Array[Byte]]]]
type CFTimeseriesRow = Map[Array[Byte], Map[Array[Byte], Map[Long, Array[Byte]]]]
type CFTimeseriesRowStr = scala.collection.immutable.Map[String, scala.collection.immutable.Map[String, scala.collection.immutable.Map[Long, String]]]
import scala.collection.JavaConverters._
def rowToStrMap(navMap: CFTimeseriesRow): CFTimeseriesRowStr = navMap.map(cf =>
  (Bytes.toString(cf._1), cf._2.map(col =>
    (Bytes.toString(col._1), col._2.map(elem => (elem._1, Bytes.toString(elem._2)))))))
def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
  navMap.asScala.toMap.map(cf =>
    (cf._1, cf._2.asScala.toMap.map(col =>
      (col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))
@transient val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])
  .map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
  .map(kv => (Bytes.toString(kv._1), rowToStrMap(kv._2))).take(10).foreach(println)