tkrhgch tkrhgch - 13 days ago 8
Scala Question

Writing files to local system with Spark in Cluster mode

I know this is a weird way of using Spark but I'm trying to save a dataframe to the local file system (not hdfs) using Spark even though I'm in

cluster mode
. I know I can use
client mode
but I do want to run in
cluster mode
and don't care which node (out of 3) the application is going to run on as driver.
The code below is the pseudo code of what I'm trying to do.

// create dataframe
val df = Seq(Foo("John", "Doe"), Foo("Jane", "Doe")).toDF()
// save it to the local file system using 'file://' because it defaults to hdfs://
df.coalesce(1).rdd.saveAsTextFile(s"file://path/to/file")


And this is how I'm submitting the spark application.

spark-submit --class sample.HBaseSparkRSample --master yarn-cluster hbase-spark-r-sample-assembly-1.0.jar


This works fine if I'm in
local mode
but doesn't in
yarn-cluster mode
.

For example,
java.io.IOException: Mkdirs failed to create file
occurs with the above code.

I've changed the
df.coalesce(1)
part to
df.collect
and attempted to save a file using plain Scala but it ended up with a
Permission denied
.

I've also tried:


  • spark-submit
    with
    root
    user

  • chown
    ed
    yarn:yarn
    ,
    yarn:hadoop
    ,
    spark:spark

  • gave
    chmod 777
    to related directories



but no luck.

I'm assuming this has to do something with
clusters
,
drivers and executors
, and the
user
who's trying to write to the local file system but am pretty much stuck in solving this problem by myself.

I'm using:


  • Spark: 1.6.0-cdh5.8.2

  • Scala: 2.10.5

  • Hadoop: 2.6.0-cdh5.8.2



Any support is welcome and thanks in advance.

Some articles I've tried:


  • "Spark saveAsTextFile() results in Mkdirs failed to create for half of the directory" -> Tried changing users but nothing changed

  • "Failed to save RDD as text file to local file system" ->
    chmod
    didn't help me



Edited (2016/11/25)



This is the Exception I get.

java.io.IOException: Mkdirs failed to create file:/home/foo/work/rhbase/r/input/input.csv/_temporary/0/_temporary/attempt_201611242024_0000_m_000000_0 (exists=false, cwd=file:/yarn/nm/usercache/foo/appcache/application_1478068613528_0143/container_e87_1478068613528_0143_01_000001)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:920)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:813)
at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1193)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/11/24 20:24:12 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Mkdirs failed to create file:/home/foo/work/rhbase/r/input/input.csv/_temporary/0/_temporary/attempt_201611242024_0000_m_000000_0 (exists=false, cwd=file:/yarn/nm/usercache/foo/appcache/application_1478068613528_0143/container_e87_1478068613528_0143_01_000001)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:920)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:813)
at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1193)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Answer

I'm going to answer my own question because eventually, none of the answers didn't seem to solve my problem. None the less, thanks for all the answers and pointing me to alternatives I can check.

I think @Ricardo was the closest in mentioning the user of the Spark application. I checked whoami with Process("whoami") and the user was yarn. The problem was probably that I tried to output to /home/foo/work/rhbase/r/input/input.csv and although /home/foo/work/rhbase was owned by yarn:yarn, /home/foo was owned by foo:foo. I haven't checked in detail but this may have been the cause of this permission problem.

When I hit pwd in my Spark application with Process("pwd"), it output /yarn/path/to/somewhere. So I decided to output my file to /yarn/input.csv and it was successful despite in cluster mode.

I probably can conclude that this was just a simple permission issue. Any further solution would be welcome but for now, this was the way how I solved this question.