Paul Trehiou Paul Trehiou - 1 month ago 17
Scala Question

Spark issue reading a CSV

I experienced a curious behavior of the spark-shell. If the working directory is / I can't read a CSV, however if it is any other directory this will work. :

trehiou@cds-stage-ms4 ~> docker run -it --rm -v /data/spark-

test:/mnt/data localhost:5000/spark-master:2.0.1 sh
/ # /app/spark-2.0.1/bin/spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/11/02 10:01:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/11/02 10:01:59 WARN spark.SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://172.17.0.1:4040
Spark context available as 'sc' (master = local[*], app id = local-1478080919699).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.0.1
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.read.option("header", "true").option("inferSchema", "true").csv("file:///mnt/data/test.csv").printSchema()
java.io.IOException: No FileSystem for scheme: null
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.makeQualifiedPath(SessionCatalog.scala:115)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:145)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.<init>(SessionCatalog.scala:89)
at org.apache.spark.sql.internal.SessionState.catalog$lzycompute(SessionState.scala:95)
at org.apache.spark.sql.internal.SessionState.catalog(SessionState.scala:95)
at org.apache.spark.sql.internal.SessionState$$anon$1.<init>(SessionState.scala:112)
at org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:112)
at org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:111)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:382)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:143)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:413)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:349)
... 48 elided

/ # cd /mnt/data/
/mnt/data # /app/spark-2.0.1/bin/spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/11/02 10:02:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/11/02 10:02:26 WARN spark.SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://172.17.0.1:4040
Spark context available as 'sc' (master = local[*], app id = local-1478080946728).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.0.1
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.read.option("header", "true").option("inferSchema", "true").csv("file:///mnt/data/test.csv").printSchema()
root
|-- 2MASS: string (nullable = true)
|-- RAJ2000: double (nullable = true)
|-- DEJ2000: double (nullable = true)
|-- errHalfMaj: double (nullable = true)
|-- errHalfMin: double (nullable = true)
|-- errPosAng: integer (nullable = true)
|-- Jmag: double (nullable = true)
|-- Hmag: double (nullable = true)
|-- Kmag: double (nullable = true)
|-- e_Jmag: double (nullable = true)
|-- e_Hmag: double (nullable = true)
|-- e_Kmag: double (nullable = true)
|-- Qfl: string (nullable = true)
|-- Rfl: integer (nullable = true)
|-- X: integer (nullable = true)
|-- MeasureJD: double (nullable = true)


I would like to understand why this is happening because this does not make any sense. The PWD in this example is were the file I read is stored but I tested why other random path and it work either. Maybe this is a bug and I should file a bug report ?

EDIT : The behavior is exactly the same while reading from HDFS

EDIT 2 : I get the exact same behavior when launching a job via spark-submit

Answer

The default value for spark.sql.warehouse.dir is {working-dir}/spark-warehouse. When running from root, this becomes //spark-warehouse, and when hadoopPath.getFileSystem(hadoopConf) is called on such a path (in SessionCatalog.makeQualifiedPath), Hadoop fails to identify the scheme.

The workaround is simple - start your spark shell while overriding this parameter with some sane value starting with file:/, e.g.:

/app/spark-2.0.1/bin/spark-shell --conf spark.sql.warehouse.dir=file:/tmp/spark-warehouse

This unpleasant behavior might be related to this open issue, but I'm not sure - might be a separate issue because it does work for non-root values.

Comments