mithra mithra - 1 month ago 15
Java Question

How to Parsing CSV or JSON File with Apache Spark

I am new to spark, i have gone through spark doc.
From my little knowledge i infer we shall pass any file and traverse it line by line and filter with some criteria (like line contains key word "ERROR")
One can find the total count of lines as well as the sub RDD which contains filtered content
Also we can find word count, pagerank and etc.. They all handle only with one criteria

I want to use group-by & reduce to find the following from csv (one line by employed)

Department, Designation, costToCompany, State
Sales, Trainee, 12000, UP
Sales, Lead, 32000, AP
Sales, Lead, 32000, LA
Sales, Lead, 32000, TN
Sales, Lead, 32000, AP
Sales, Lead, 32000, TN
Sales, Lead, 32000, LA
Sales, Lead, 32000, LA
Marketing, Associate, 18000, TN
Marketing, Associate, 18000, TN
HR, Manager, 58000, TN


I would like to simplify the about csv with group by Department,Designation,State with additional columns with sum(costToCompany) and TotalEmployeeCount

Should get result like

Dept, Desg, state, empCount, totalCost
Sales,Lead,AP,2,64000
Sales,Lead,LA,3,96000
Sales,Lead,TN,2,64000


Is there anyway to achieve this using transformations and actions.. Or we should go for

https://spark.apache.org/docs/latest/sql-programming-guide.html#rdds

Any Help is much appreciated.

Answer

Procedure

  • Create a Class (Schema) to encapsulate your structure (it’s not required for the approach B, but it would make your code easier to read if you are using Java)

    public class Record implements Serializable {
      String department;
      String designation;
      long costToCompany;
      String state;
      // constructor , getters and setters  
    }
    
  • Loading CVS (JSON) file

    JavaSparkContext sc;
    JavaRDD<String> data = sc.textFile("path/input.csv");
    JavaSQLContext sqlContext = new JavaSQLContext(sc);
    
    JavaRDD<Record> rdd_records = sc.textFile(data).map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             // Here you can use JSON
             // Gson gson = new Gson();
             // gson.fromJson(line, Record.class);
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
             return sd;
          }
    });
    

At this point you have 2 approaches:

A. SparkSQL

  • Register a table (using the your defined Schema Class)

    JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
    table.registerAsTable("record_table");
    table.printSchema();
    
  • Query the table with your desired Query-group-by

    JavaSchemaRDD res = sqlContext.sql("
      select department,designation,state,sum(costToCompany),count(*) 
      from record_table 
      group by department,designation,state
    ");
    
  • Here you would also be able to do any other query you desire, using a SQL approach

B. Spark

  • Mapping using a composite key: Department,Designation,State

    JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
    rdd_records.mapToPair(new
      PairFunction<Record, String, Tuple2<Long, Integer>>(){
        public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
          Tuple2<String, Tuple2<Long, Integer>> t2 = 
          new Tuple2<String, Tuple2<Long,Integer>>(
            record.Department + record.Designation + record.State,
            new Tuple2<Long, Integer>(record.costToCompany,1)
          );
          return t2;
    }
    

    });

  • reduceByKey using the composite key, summing costToCompany column, and accumulating the number of records by key

    JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
     records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
     Integer>, Tuple2<Long, Integer>>() {
        public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
        Tuple2<Long, Integer> v2) throws Exception {
            return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
        }
    });
    
Comments