user2384993 user2384993 - 1 month ago 37
Scala Question

spark streaming fileStream

I'm programming with spark streaming but have some trouble with scala. I'm trying to use the function StreamingContext.fileStream

The definition of this function is like this:

def fileStream[K, V, F <: InputFormat[K, V]](directory: String)(implicit arg0: ClassManifest[K], arg1: ClassManifest[V], arg2: ClassManifest[F]): DStream[(K, V)]


Create a input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format. File names starting with . are ignored.
K
Key type for reading HDFS file
V
Value type for reading HDFS file
F
Input format for reading HDFS file
directory
HDFS directory to monitor for new file

I don't know how to pass the type of Key and Value.
My Code in spark streaming:

val ssc = new StreamingContext(args(0), "StreamingReceiver", Seconds(1),
System.getenv("SPARK_HOME"), Seq("/home/mesos/StreamingReceiver.jar"))

// Create a NetworkInputDStream on target ip:port and count the
val lines = ssc.fileStream("/home/sequenceFile")


Java code to write the hadoop file:

public class MyDriver {

private static final String[] DATA = { "One, two, buckle my shoe",
"Three, four, shut the door", "Five, six, pick up sticks",
"Seven, eight, lay them straight", "Nine, ten, a big fat hen" };

public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),
value.getClass());
for (int i = 0; i < 100; i++) {
key.set(100 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
value);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}


}

Answer

If you want to use fileStream, you're going to have to supply all 3 type params to it when calling it. You need to know what your Key, Value and InputFormat types are before calling it. If your types were LongWritable, Text and TextInputFormat, you would call fileStream like so:

val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/sequenceFile")

If those 3 types do happen to be your types, then you might want to use textFileStream instead as it does not require any type params and delegates to fileStream using those 3 types I mentioned. Using that would look like this:

val lines = ssc.textFileStream("/home/sequenceFile")
Comments