user2065276 user2065276 - 5 months ago 334
Python Question

saving pyspark rdd to hbase raises attribute error

I am trying to write Spark RDD using Pyspark into Hbase table. The RDD looks like
following using print rdd.take(rdd.count()) command

[Decimal('0.39326837'), Decimal('0.03643601'), Decimal('0.06031798'), Decimal('0.08885452')]


When i try to write the rdd into Hbase table using the function saveRecord

def SaveRecord(tx_fee_rdd):
host = 'localhost' #sys.argv[1]
table = 'tx_fee_table' #needs to be created before hand in hbase shell
conf = {"hbase.zookeeper.quorum": host,
"hbase.mapred.outputtable": table,
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
#row key id,id, cfamily=tx_fee_col,column_name = tx_fee, column_value=x
datamap = tx_fee_rdd.map(lambda x: ("tx_fee_col","tx_fee",x ) )
datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)


tx_fee_rdd.foreach(SaveRecord)


I get the following ERROR

AttributeError: 'Decimal' object has no attribute 'map'


How to deal with this ?

Following @zeros323 suggestions, i am getting the following ERROR

Traceback (most recent call last):
File "/home/ubuntu/unix_practice/bcrpc/bitcoin-inspector-webserver/bitcoin/bctxfee_text3.py", line 66, in <module>
SaveRecord(tx_fee_rdd)
File "/home/ubuntu/unix_practice/bcrpc/bitcoin-inspector-webserver/bitcoin/bctxfee_text3.py", line 29, in SaveRecord
datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1348, in saveAsNewAPIHadoopDataset
File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset.
: org.apache.spark.SparkException: RDD element of type [Ljava.lang.Object; cannot be used
at org.apache.spark.api.python.SerDeUtil$.pythonToPairRDD(SerDeUtil.scala:237)
at org.apache.spark.api.python.PythonRDD$.saveAsHadoopDataset(PythonRDD.scala:801)
at org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

Answer

foreach operates on individual records hence receives Decimal objects not RDDs. You cannot map over these not to mention use saveAsNewAPIHadoopDataset method.

If you want to use saveAsNewAPIHadoopDataset your function should operate directly on a RDD:

SaveRecord(tx_fee_rdd)

Another possible issue is following part:

datamap = tx_fee_rdd.map(lambda x: ("tx_fee_col","tx_fee",x ) )  

saveAsNewAPIHadoopDataset expect pairs not triplets. It also may not work with Decimal objects. See hbase_outputformat.py example for details.

Comments