stian stian - 2 months ago 15
Scala Question

how to parse a CSV file with dataVec using a schema?

I am trying to load a CSV data set with canova/datavec, and can not find the "idiomatic" way of doing it. I struggle a bit since I feel that there is an evolution of the framework, which makes it difficult for me to determine what is relevant and what is not.

object S extends App{
val recordReader:RecordReader = new CSVRecordReader(0, ",")
recordReader.initialize(new FileSplit(new File("./src/main/resources/CSVdataSet.csv")))
val iter:DataSetIterator = new RecordReaderDataSetIterator(recordReader, 100)
while(iter.hasNext){
println(iter.next())
}
}


I have a csv file that starts with a header description, and thus my output is an exception

(java.lang.NumberFormatException: For input string: "iid":)


I started looking into the schema builder, since I get an exception because of schema/the header. So I was thinking to add a schema like this;

val schema = new Schema.Builder()
.addColumnInteger("iid")
.build()


From my point of view, the noob-view, the BasicDataVec-examples are not completely clear because they link it to spark etc. From the IrisAnalysisExample (https://github.com/deeplearning4j/dl4j-examples/blob/master/datavec-examples/src/main/java/org/datavec/transform/analysis/IrisAnalysis.java).
I assume that the file content is first read into JavaRDD (potentially a Stream) and then treated afterwards. The schema is not used except for the DataAnalysis.

So, could someone help with making me understand how I parse (as a stream or iterator, a CSV-file with a header description as the first line?

I understand from their book (Deep learning:A practitioners Approach) that spark is needed for data transformation (which a schema is used for). I thus rewrote my code to;

object S extends App{
val schema: Schema = new Schema.Builder()
.addColumnInteger("iid")
.build
val recordReader = new CSVRecordReader(0, ",")
val f = new File("./src/main/resources/CSVdataSet.csv")
recordReader.initialize(new FileSplit(f))
val sparkConf:SparkConf = new SparkConf()
sparkConf.setMaster("local[*]");
sparkConf.setAppName("DataVec Example");
val sc:JavaSparkContext = new JavaSparkContext(sparkConf)
val lines = sc.textFile(f.getAbsolutePath);
val examples = lines.map(new StringToWritablesFunction(new CSVRecordReader()))
val process = new TransformProcess.Builder(schema).build()
val executor = new SparkTransformExecutor()
val processed = executor.execute(examples, process)
println(processed.first())
}


I thought now that the schema would dictate that I only would have the iid-column, but the output is:

[iid, id, gender, idg, .....]

Answer

It might be considered bad practice to answer my own question, but I will keep my question (and now answer) for a while to see if it was informative and useful for others.

I understand how to use a schema on data where I can create corresponding schema attribute for all of the features. I originally wanted to work on a dataset with more than 200 feature values in each vector. Having to declare a static schema containing a column attribute for all 200 features made it impractical to use. However, there is probably a more dynamic way of creating schemas, and I just have not found that yet. I decided to test my code on the Iris.csv data set. Here the file contains row attributes for;

Id,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species 

Which would be implemented as a schema:

 val schema: Schema = new Schema.Builder()
    .addColumnInteger("Id")
    .addColumnDouble("SepalLengthCm")
    .addColumnDouble("SepalWidthCm")
    .addColumnDouble("PetalLengthCm")
    .addColumnDouble("PetalWidthCm")
    .addColumnString("Species")
    .build   

I feel that one of the motives behind using a schema is to be able to transform the data. Thus, I would like to perform a transform operation. A TransformProcess defines a sequence of operations to perform on our data (Using DataVec appendix F page 405 DeepLearning: A practitioners Approach).

A TransformProcess is constructed by specifying two things:
   • The Schema of the initial input data
   • The set of operations we wish to execute Using DataVec

I decided to see if I could remove a column from the read data:

val process = new TransformProcess.Builder(schema)
  .removeColumns("Id")
  .build()

Thus, my code became:

import org.datavec.api.records.reader.impl.csv.CSVRecordReader
import org.datavec.api.transform.{DataAction, TransformProcess}
import org.datavec.api.transform.schema.Schema
import java.io.File
import org.apache.spark.api.java.JavaSparkContext
import org.datavec.spark.transform.misc.StringToWritablesFunction
import org.apache.spark.SparkConf
import org.datavec.api.split.FileSplit
import org.datavec.spark.transform.SparkTransformExecutor

object S extends App{
   val schema: Schema = new Schema.Builder()
      .addColumnInteger("Id")
      .addColumnDouble("SepalLengthCm")
      .addColumnDouble("SepalWidthCm")
      .addColumnDouble("PetalLengthCm")
      .addColumnDouble("PetalWidthCm")
      .addColumnString("Species")
      .build

  val recordReader = new CSVRecordReader(0, ",")
  val f = new File("./src/main/resources/Iris.csv")
  recordReader.initialize(new FileSplit(f))
  println(recordReader.next())
  val sparkConf:SparkConf = new SparkConf()
  sparkConf.setMaster("local[*]");
  sparkConf.setAppName("DataVec Example");
  val sc:JavaSparkContext = new JavaSparkContext(sparkConf)
  val lines = sc.textFile(f.getAbsolutePath);
  val examples = lines.map(new StringToWritablesFunction(new CSVRecordReader()))
  val process = new TransformProcess.Builder(schema)
      .removeColumns("Id")
      .build()
  val executor = new SparkTransformExecutor()
  val processed = executor.execute(examples, process)
  println(processed.first())
}

The first prints:

[Id, SepalLengthCm, SepalWidthCm, PetalLengthCm, PetalWidthCm, Species]

the second prints

[SepalLengthCm, SepalWidthCm, PetalLengthCm, PetalWidthCm, Species]