underwood underwood - 1 year ago 115
Scala Question

Pass columnNames dynamically to cassandraTable().select()

I'm reading query off of a file at run-time and executing it on the SPark+Cassandra environment.

I'm executing :

sparkContext.cassandraTable.("keyspaceName", "colFamilyName").select("col1", "col2", "col3").where("some condition = true")

Query in FIle :

select col1, col2, col3
from keyspaceName.colFamilyName
where somecondition = true

Here Col1,col2,col3 can vary depending on the query parsed from the file.

Question :

How do I pick columnName from query and pass them to select() and runtime.

I have tried many ways to do it :

1. dumbest thing done (which obviously threw an error) -

var str = "col1,col2,col3"
var selectStmt = str.split("\\,").map { x => "\"" + x.trim() + "\"" }.mkString(",")
var queryRDD = sc.cassandraTable().select(selectStmt)

Any ideas are welcome.

Side Notes :

1. I do not want to use cassandraCntext becasue it will be depricated/ removed in next realase (https://docs.datastax.com/en/datastax_enterprise/4.5/datastax_enterprise/spark/sparkCCcontext.html)

2. I'm on

- a. Scala 2.11

- b. spark-cassandra-connector_2.11:1.6.0-M1

- c. Spark 1.6

Answer Source

Use Cassandra Connector

Your use case sounds like you actually want to use CassandraConnector Objects. These give you a direct access to a per ExecutorJVM session pool and are ideal for just executing random queries. This will end up being much more efficient than creating an RDD for each query.

This would look something like

rddOfStatements.mapPartitions( it => 
  CassandraConnector.withSessionDo { session => 
    it.map(statement => 

But you most likely would want to use executeAsync and handle the futures separately for better performance.

Programatically specifying columns in cassandraTable

The select method takes ColumnRef* which means you need to pass in some number of ColumnRefs. Normally there is an implicit conversion from String --> ColumnRef which is why you can pass in just a var-args of strings.

Here it's a little more complicated because we want to pass var args of another type so we end up with double implicits and Scala doesn't like that.

So instead we pass in ColumnName objects as varargs (:_*)

 Keyspace: test
 Table: dummy
 - id                      : java.util.UUID                                                                   (partition key column)
 - txt                     : String

val columns = Seq("id", "txt")
columns: Seq[String] = List(id, txt)

//Convert the strings to ColumnNames (a subclass of ColumnRef) and treat as var args

Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693, txt: hello world})

//Only use the first column

Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693})

//Only use the last column        

Array(CassandraRow{txt: hello world})