Donbeo Donbeo - 2 months ago 29
Scala Question

How to read csv file as DataFrame?

I would like to read a csv in spark and I would like to store it in a spark dataframe. https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html

I have tried:

scala> val df = sqlContext.load("/home/donbeo/Documents/dataset/winequality-red.csv")
java.lang.RuntimeException: file:/home/donbeo/Documents/dataset/winequality-red.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


but it is not working.
What is the right command?

Answer

Spark SQL provides inbuilt support for only 3 types of data sources:

  1. Parquet (This is default)
  2. Json
  3. Jdbc

For CSV, there is a separate library: spark-csv

It's CsvContext class provides csvFile method which can be used to load csv.

val cars = sqlContext.csvFile("cars.csv") // uses implicit class CsvContext
Comments