Hafiz Mujadid Hafiz Mujadid - 1 month ago 34
Java Question

reading all files from HDFS recursively in spark java api

I 'm using spark to read data of all files from HDFS in a single RDD from a directory and it's sub directories as well. I could not find any efficient method to do that. So I tried to write some customized code as shown below:

public Object fetch(String source,String sink) {

//reading data
boolean isDir=new File(source).isDirectory();
System.out.println("isDir="+isDir);
JavaRDD<String> lines;
if(isDir)
{

lines=readFiles(new File(source).listFiles(), null);
}
else
lines= sc.textFile(source);

lines.saveAsTextFile(sink);
return true;
}

public static JavaRDD<String> readFiles(File[] files,JavaRDD<String> lines) {
for (File file : files) {
if (file.isDirectory()) {
readFiles(file.listFiles(),lines); // Calls same method again.
}
else {
if(lines==null)
lines=sc.textFile(file.getPath());
else
{
JavaRDD<String> r=sc.textFile(file.getPath());
lines.union(r);
}
}
}
return lines;
}


but this is not doing my expected job as isDir contains false telling that it's not a directory.
Please can u guide me about what's wrong? and is there some efficient way to do this job?
Thanks alot

Answer

As spark can read data based on a Hadoop Job configuration, you can use the FileInputFormat#setInputDirRecursive method.

JavaSparkContext context = new JavaSparkContext();

Job job;

try {
  job = Job.getInstance();
  FileInputFormat.setInputPaths(job, new Path("/path/to/input/directory));
  FileInputFormat.setInputDirRecursive(job, true);
} catch (IOException e1) {
  e1.printStackTrace();
  System.exit(1);
}

JavaRDD<Text> sourceData = context.newAPIHadoopRDD(job.getConfiguration(), TextInputFormat.class, LongWritable.class, Text.class)
  .values();

Obviously you will end up with a Text data type instead of a String.