Ravi Ranjan Ravi Ranjan - 1 month ago 22
Scala Question

Putting Multiple column names from a HBase Table into one SparkRDD

I have to put multiple column families from a table in HBase into one sparkRDD. I am attempting this using the following code: (question edited after first aanswer)

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import scala.collection.JavaConverters._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark._
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client._
object HBaseRead {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local").set("spark.driver.allowMultipleContexts","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
val tableName = "TableName"

////setting up required stuff
System.setProperty("user.name", "hdfs")
System.setProperty("HADOOP_USER_NAME", "hdfs")
conf.set("hbase.master", "localhost:60000")
conf.setInt("timeout", 120000)
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set(TableInputFormat.INPUT_TABLE, tableName)
sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result]))
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(tableName)
admin.createTable(tableDesc)
}
case class Model(Shoes: String,Clothes: String,T-shirts: String)
var hBaseRDD2 = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
val transformedRDD = hBaseRDD2.map(tuple => {
val result = tuple._2
Model(Bytes.toString(result.getValue(Bytes.toBytes("Category"),Bytes.toBytes("Shoes"))),
Bytes.toString(result.getValue(Bytes.toBytes("Category"),Bytes.toBytes("Clothes"))),
Bytes.toString(result.getValue(Bytes.toBytes("Category"),Bytes.toBytes("T-shirts")))
)
})
val totalcount = transformedRDD.count()
println(totalcount)
}
}


What I want to do is to make a single rdd wherein values of first row (and subsequent rows later on) from these column families would be combined in a single array in the rdd. Any help would be appreciated. Thanks

Answer

You can do it couple of ways, inside rdd map you can get all the columns from the parent rdd[hBaseRDD2] and transform it and return it as another single rdd.

or you can create a case class and map it to that columns.

For example:

case class Model(column1: String,
                      column1: String,
                      column1: String)

var hBaseRDD2 = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
val transformedRDD = hBaseRDD2.map(tuple => {
    val result = tuple._2
    Model(Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("Columnname1"))),
    Bytes.toString(result.getValue(Bytes.toBytes("cf2"),Bytes.toBytes("Columnname2"))),
    Bytes.toString(result.getValue(Bytes.toBytes("cf2"),Bytes.toBytes("Columnname2")))
    )
})
Comments