Djonatan Djonatan - 2 months ago 12
Java Question

Hadoop Multiple Inputs wrongly grouped - Two Way Join Exercise

I'm trying to study a bit of hadoop and read a lot about how to do the natural join. I have two files with keys and info, I want to cross and present the final result as (a, b, c).

My problem is that the mappers are calling reducers for each file. I was expecting to receive something like (10, [R1,S10, S22]) (being 10 the key, 1, 10, 22, are values of different rows that have 10 as key and R and S are tagging so I can identify from which table they come from).

The thing is that my reducer receives (10, [S10, S22]) and only after finishing with all the S file I get another key value pair like (10, [R1]). That means, it groups by key separately for each file and calls the reducer

I'm not sure if that the correct behavior, if I have to configure it in a different way or if I'm doing everything wrong.

I'm also new to java, so code might look bad to you.

I'm avoiding using the TextPair data type because I can't come up with that myself yet and I would think that this would be another valid way (just in case you are wondering). Thanks

Running hadoop 2.4.1 based on the WordCount example.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;

public class TwoWayJoin {

public static class FirstMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);

Text a = new Text();
Text b = new Text();

a.set(tokenizer.nextToken());
b.set(tokenizer.nextToken());

output.collect(b, relation);
}
}

public static class SecondMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);

Text b = new Text();
Text c = new Text();

b.set(tokenizer.nextToken());
c.set(tokenizer.nextToken());

Text relation = new Text("S"+c.toString());

output.collect(b, relation);

}
}

public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

ArrayList < Text > RelationS = new ArrayList < Text >() ;
ArrayList < Text > RelationR = new ArrayList < Text >() ;

while (values.hasNext()) {
String relationValue = values.next().toString();
if (relationValue.indexOf('R') >= 0){
RelationR.add(new Text(relationValue));
} else {
RelationS.add(new Text(relationValue));
}
}

for( Text r : RelationR ) {
for (Text s : RelationS) {
output.collect(key, new Text(r + "," + key.toString() + "," + s));
}
}
}
}

public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(MultipleInputs.class);
conf.setJobName("TwoWayJoin");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);

conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, FirstMap.class);
MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, SecondMap.class);

Path output = new Path(args[2]);

FileOutputFormat.setOutputPath(conf, output);

FileSystem.get(conf).delete(output, true);

JobClient.runJob(conf);

}
}


R.txt

(a b(key))
2 46
1 10
0 24
31 50
11 2
5 31
12 36
9 46
10 34
6 31


S.txt

(b(key) c)
45 32
45 45
46 10
36 15
45 21
45 28
45 9
45 49
45 18
46 21
45 45
2 11
46 15
45 33
45 6
45 20
31 28
45 32
45 26
46 35
45 36
50 49
45 13
46 3
46 8
31 45
46 18
46 21
45 26
24 15
46 31
46 47
10 24
46 12
46 36


Output for this code is successful but empty because I either have the Array R empty or the Array S empty.

I have all the rows mapped if I simply collect them one by one without processing anything.

Expected output is

key "a,b,c"

Answer

The problem is with the combiner. Remember combiner applies the reduce function on the map output. So indirectly what it does is the reduce function is applied on your R and S relation separately and that is the reason you get the R and S relation in different reduce calls. Comment out

conf.setCombinerClass(Reduce.class);

and try running again there should not be any problem. On a side note the combiner function will only be helpful only when you feel your aggregation result of your map output would be the same when it is applied on the input once the sort and shuffle is done.