Shabarinath Volam Shabarinath Volam - 2 months ago 19
Java Question

Saving JavaList to Cassandra table using spark context

Hi I am very new to spark and scala, Here I am facing some issue with saving data into cassandra below is my scenario

1) I get list of user defined objects (say User Objects which contains firstName, lastName etc..) from my java class to scala class and upto here its fine I am able to access User Object and able to print its contents

2) Now I want to save that usersList into cassandra table using spark context, I have gone through many examples but every where I see creating Seq with our caseClass and hardcoded values and then saving to cassandra, I have tried that and working fine for me as below

import scala.collection.JavaConversions._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import com.datastax.spark.connector._
import java.util.ArrayList

object SparkCassandra extends App {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("SparkCassandra")
//set Cassandra host address as your local address
.set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext(conf)
val usersList = Test.getUsers
usersList.foreach(x => print(x.getFirstName))
val collection = sc.parallelize(Seq(userTable("testName1"), userTable("testName1")))
collection.saveToCassandra("demo", "user", SomeColumns("name"))
sc.stop()
}

case class userTable(name: String)


But here my requirement is to use dynamic values from my usersList instead of hardcoaded values, or any other way to achieve this.

Answer

Finally I got solution for my requirement tested and working fine as below:

My Scala Code:

import scala.collection.JavaConversions.asScalaBuffer
import scala.reflect.runtime.universe
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.SomeColumns
import com.datastax.spark.connector.toNamedColumnRef
import com.datastax.spark.connector.toRDDFunctions

object JavaListInsert {
  def randomStores(sc: SparkContext, users: List[User]): RDD[(String, String, String)] = {
       sc.parallelize(users).map { x => 
       val fistName = x.getFirstName
       val lastName = x.getLastName
       val city = x.getCity
       (fistName, lastName, city)
    }
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("cassandraInsert")
    val sc = new SparkContext(conf)
    val usersList = Test.getUsers.toList
    randomStores(sc, usersList).
      saveToCassandra("test", "stores", SomeColumns("first_name", "last_name", "city"))
    sc.stop
  }
}

Java Pojo Object:

    import java.io.Serializable;
    public class User implements Serializable{
        private static final long serialVersionUID = -187292417543564400L;
        private String firstName;
        private String lastName;
        private String city;

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }

        public String getLastName() {
            return lastName;
        }

        public void setLastName(String lastName) {
            this.lastName = lastName;
        }

        public String getCity() {
            return city;
        }

        public void setCity(String city) {
            this.city = city;
        }
}

Java Class to return List of Users:

import java.util.ArrayList;
import java.util.List;


public class Test {
    public static List<User> getUsers() {
        ArrayList<User> usersList = new ArrayList<User>();
        for(int i=1;i<=100;i++) {
            User user = new User();
            user.setFirstName("firstName_+"+i);
            user.setLastName("lastName_+"+i);
            user.setCity("city_+"+i);
            usersList.add(user);
        }
        return usersList;
    }
}
Comments