robinlmorris robinlmorris - 2 months ago 28
Scala Question

Strings are equal but behave different: File strings don't work with spark if they are from an HDFS file

This problem is easier to explain inline with the code:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

// I have a file (fileToProcess) in HDFS that contains the name of another HDFS file:
val fs = FileSystem.get(new Configuration())
val fileToProcess = "hdfs://production/user/robin/fileToProcess"
// The contents of fileToProcess is just the name of another file. In this case
// hdfs://production/user/robin/people
// I want to read hdfs://production/user/robin/people and use it in the data frame as coded below.
// However, if I do this by reading the HDFS file (fileToProcess) to get the file name like this:
val file = fs.open(new Path(fileToProcess)).readLine().toString

// I will get a Task not serializable exception on the last line of the script

// If I hardcode the file name like this:
val file2 = "hdfs://production/user/robin/people"
// It works great; however, I can't do this as I don't know the file I need to read in reality
// file2 and file are both Strings seem equal in every way so I am really perplexed!

// Here is what I am doing with the file to get the exception

// The contents of people:
// { "person" : "John"}
// { "person" : "Sue"}
// { "person" : "Jean"}
// { "person" : "Jane"}
// { "person" : "John"}

val df = sqlcontext.read.json(file)
val peopleList = df.map(r => r(0).toString).distinct.collect
val anotherList = sc.parallelize(Array("Jean", "Sue", "Bill"))

val peopleListBroadcast = sc.broadcast(peopleList)

// everything works great up to this point

val filteredPeople = anotherList.filter(x=> peopleListBroadcast.value contains x)
// here I get a Task not serializable exception if I use the file name read from the HDFS file but it works fine if I hardcode it (like with file2)


I have been stuck on this stage problem for days now. I can't seem to find a work around either. Are there differences I can't see in the string? How can two strings that are equal behave so differently. Please help me as I am going nuts trying to figure this out!

The specific exception I am getting is:

Caused by: java.io.NotSerializableException: org.apache.hadoop.hdfs.DistributedFileSystem
Serialization stack:
- object not serializable (class: org.apache.hadoop.hdfs.DistributedFileSystem, value: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1011603383_1, ugi=robin (auth:SIMPLE)]])
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: fs, type: class org.apache.hadoop.fs.FileSystem)

Btw, I am using Spark 1.6.1 and Scala 2.10.5. Anyone should be able to recreate this I think by making the two files in hdfs and then putting the code above in spark-shell

Thanks,
Robin

Answer

It has nothing to do with strings. You put in the scope an instance of org.apache.hadoop.fs.FileSystem which is not Serializable. Marking it as transient should resolve this particular issue:

@transient val fs = FileSystem.get(new Configuration())
Comments