Oleksandra Oleksandra - 2 months ago 19
Java Question

NullPointerException in hadoop.mapreduce.lib.input.FileInputFormat.getBlockIndex when chaining two jobs

I am trying to build inverted index.

I chain two jobs.

Basically, the first job parses the input and cleans it, and stores result in a folder 'output' which is the input folder to the second job.

The second job is supposed to actually build the inverted index.

When I just had the first job, it worked fine (at least, there were no exceptions).

I chain two jobs like this:

public class Main {

public static void main(String[] args) throws Exception {

String inputPath = args[0];
String outputPath = args[1];
String stopWordsPath = args[2];
String finalOutputPath = args[3];

Configuration conf = new Configuration();
conf.set("job.stopwords.path", stopWordsPath);

Job job = Job.getInstance(conf, "Tokenize");

job.setJobName("Tokenize");
job.setJarByClass(TokenizerMapper.class);

job.setNumReduceTasks(1);

FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PostingListEntry.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PostingListEntry.class);

job.setOutputFormatClass(MapFileOutputFormat.class);

job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(TokenizerReducer.class);

// Delete the output directory if it exists already.
Path outputDir = new Path(outputPath);
FileSystem.get(conf).delete(outputDir, true);

long startTime = System.currentTimeMillis();
job.waitForCompletion(true);
System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");

//-------------------------------------------------------------------------

Configuration conf2 = new Configuration();

Job job2 = Job.getInstance(conf2, "BuildIndex");

job2.setJobName("BuildIndex");
job2.setJarByClass(InvertedIndexMapper.class);

job2.setOutputFormatClass(TextOutputFormat.class);

job2.setNumReduceTasks(1);

FileInputFormat.setInputPaths(job2, new Path(outputPath));
FileOutputFormat.setOutputPath(job2, new Path(finalOutputPath));

job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(PostingListEntry.class);

job2.setMapperClass(InvertedIndexMapper.class);
job2.setReducerClass(InvertedIndexReducer.class);

// Delete the output directory if it exists already.
Path finalOutputDir = new Path(finalOutputPath);
FileSystem.get(conf2).delete(finalOutputDir, true);

startTime = System.currentTimeMillis();
// THIS LINE GIVES ERROR:
job2.waitForCompletion(true);
System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}


I get an

Exception in thread "main" java.lang.NullPointerException
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getBlockIndex(FileInputFormat.java:444)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:413)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
at Main.main(Main.java:79)


What is wrong with this configuration, and how should I chain the jobs?

Answer

It isn't clear if you're intentionally using MapFileOutputFormat as the output format in the first job. The more common approach is to use SequenceFileOutputFormat with SequenceFileInputFormat as the input format in the second job.

At the moment, you've specified MapFileOutputFormat as the output to the first job with no input specified in the second, so it will be TextInputFormat which is unlikely to work.

Looking at your TokenizerReducer class the signature for the reduce method is incorrect. You have:

public void reduce(Text key, Iterator<PostingListEntry> values, Context context)

It should be:

public void reduce(Key key, Iterable<PostingListEntry> values, Context context)

Because of this it won't be calling your implementation, so it will just be an identity reduce.