Shabarinath Volam Shabarinath Volam - 1 year ago 101
Java Question

Saving JavaList to Cassandra table using spark context

Hi I am very new to spark and scala, Here I am facing some issue with saving data into cassandra below is my scenario

1) I get list of user defined objects (say User Objects which contains firstName, lastName etc..) from my java class to scala class and upto here its fine I am able to access User Object and able to print its contents

2) Now I want to save that usersList into cassandra table using spark context, I have gone through many examples but every where I see creating Seq with our caseClass and hardcoded values and then saving to cassandra, I have tried that and working fine for me as below

import scala.collection.JavaConversions._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import com.datastax.spark.connector._
import java.util.ArrayList

object SparkCassandra extends App {
val conf = new SparkConf()
//set Cassandra host address as your local address
.set("", "")
val sc = new SparkContext(conf)
val usersList = Test.getUsers
usersList.foreach(x => print(x.getFirstName))
val collection = sc.parallelize(Seq(userTable("testName1"), userTable("testName1")))
collection.saveToCassandra("demo", "user", SomeColumns("name"))

case class userTable(name: String)

But here my requirement is to use dynamic values from my usersList instead of hardcoaded values, or any other way to achieve this.

Answer Source

Finally I got solution for my requirement tested and working fine as below:

My Scala Code:

import scala.collection.JavaConversions.asScalaBuffer
import scala.reflect.runtime.universe
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.SomeColumns
import com.datastax.spark.connector.toNamedColumnRef
import com.datastax.spark.connector.toRDDFunctions

object JavaListInsert {
  def randomStores(sc: SparkContext, users: List[User]): RDD[(String, String, String)] = {
       sc.parallelize(users).map { x => 
       val fistName = x.getFirstName
       val lastName = x.getLastName
       val city = x.getCity
       (fistName, lastName, city)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("cassandraInsert")
    val sc = new SparkContext(conf)
    val usersList = Test.getUsers.toList
    randomStores(sc, usersList).
      saveToCassandra("test", "stores", SomeColumns("first_name", "last_name", "city"))

Java Pojo Object:

    public class User implements Serializable{
        private static final long serialVersionUID = -187292417543564400L;
        private String firstName;
        private String lastName;
        private String city;

        public String getFirstName() {
            return firstName;

        public void setFirstName(String firstName) {
            this.firstName = firstName;

        public String getLastName() {
            return lastName;

        public void setLastName(String lastName) {
            this.lastName = lastName;

        public String getCity() {
            return city;

        public void setCity(String city) {
   = city;

Java Class to return List of Users:

import java.util.ArrayList;
import java.util.List;

public class Test {
    public static List<User> getUsers() {
        ArrayList<User> usersList = new ArrayList<User>();
        for(int i=1;i<=100;i++) {
            User user = new User();
        return usersList;