robinlmorris robinlmorris - 5 months ago 52
Scala Question

Strings are equal but behave different: File strings don't work with spark if they are from an HDFS file

This problem is easier to explain inline with the code:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

// I have a file (fileToProcess) in HDFS that contains the name of another HDFS file:
val fs = FileSystem.get(new Configuration())
val fileToProcess = "hdfs://production/user/robin/fileToProcess"
// The contents of fileToProcess is just the name of another file. In this case
// hdfs://production/user/robin/people
// I want to read hdfs://production/user/robin/people and use it in the data frame as coded below.
// However, if I do this by reading the HDFS file (fileToProcess) to get the file name like this:
val file = Path(fileToProcess)).readLine().toString

// I will get a Task not serializable exception on the last line of the script

// If I hardcode the file name like this:
val file2 = "hdfs://production/user/robin/people"
// It works great; however, I can't do this as I don't know the file I need to read in reality
// file2 and file are both Strings seem equal in every way so I am really perplexed!

// Here is what I am doing with the file to get the exception

// The contents of people:
// { "person" : "John"}
// { "person" : "Sue"}
// { "person" : "Jean"}
// { "person" : "Jane"}
// { "person" : "John"}

val df =
val peopleList = => r(0).toString).distinct.collect
val anotherList = sc.parallelize(Array("Jean", "Sue", "Bill"))

val peopleListBroadcast = sc.broadcast(peopleList)

// everything works great up to this point

val filteredPeople = anotherList.filter(x=> peopleListBroadcast.value contains x)
// here I get a Task not serializable exception if I use the file name read from the HDFS file but it works fine if I hardcode it (like with file2)

I have been stuck on this stage problem for days now. I can't seem to find a work around either. Are there differences I can't see in the string? How can two strings that are equal behave so differently. Please help me as I am going nuts trying to figure this out!

The specific exception I am getting is:

Caused by: org.apache.hadoop.hdfs.DistributedFileSystem
Serialization stack:
- object not serializable (class: org.apache.hadoop.hdfs.DistributedFileSystem, value: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1011603383_1, ugi=robin (auth:SIMPLE)]])
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: fs, type: class org.apache.hadoop.fs.FileSystem)

Btw, I am using Spark 1.6.1 and Scala 2.10.5. Anyone should be able to recreate this I think by making the two files in hdfs and then putting the code above in spark-shell



It has nothing to do with strings. You put in the scope an instance of org.apache.hadoop.fs.FileSystem which is not Serializable. Marking it as transient should resolve this particular issue:

@transient val fs = FileSystem.get(new Configuration())