basam basam - 5 months ago 21
Java Question

mapreduce composite Key sample - doesn't show the desired output

Being new to mapreduce & hadoop world, after trying out basic mapreduce programs, I wanted to try compositekey sample code.

Input dataset is as follows:

Country,State,County,populationinmillions

USA,CA,alameda,100

USA,CA,losangels,200

USA,CA,Sacramento,100

USA,FL,xxx, 10

USA,FL,yyy,12

Desired output data should be like this:

USA,CA,500

USA,FL,22

Here instead Country+State fields form the composite key.
I am getting the following output. The population is not getting added for some reason. Can someone point me the mistake I am doing. Also kindly take a look at the Country.java class which implements the WriteableComparable interface. May be something is wrong with that implementation.

USA,CA,100

USA,CA,200

USA,CA,100

USA,FL,10

USA,FL,12

The population is not getting added per Country+State.

This is the Country class that implements WritableComparable interface.

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

* The Country class implements WritabelComparator to implements custom sorting to perform group by operation. It
* sorts country and then state.
*
*/
public class Country implements WritableComparable<Country> {

Text country;
Text state;

public Country(Text country, Text state) {
this.country = country;
this.state = state;
}
public Country() {
this.country = new Text();
this.state = new Text();

}

/*
* (non-Javadoc)
*
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
public void write(DataOutput out) throws IOException {
this.country.write(out);
this.state.write(out);

}

/*
* (non-Javadoc)
*
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
public void readFields(DataInput in) throws IOException {

this.country.readFields(in);
this.state.readFields(in);
;

}

/*
* (non-Javadoc)
*
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
public int compareTo(Country pop) {
if (pop == null)
return 0;
int intcnt = country.compareTo(pop.country);
if (intcnt != 0) {
return intcnt;
} else {
return state.compareTo(pop.state);

}
}

/*
* (non-Javadoc)
*
* @see java.lang.Object#toString()
*/
@Override
public String toString() {

return country.toString() + ":" + state.toString();
}

}


Driver Program:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class CompositeKeyDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {


Configuration conf = new Configuration();

Job job = Job.getInstance(conf, "CompositeKeyDriver");

//first argument is job itself
//second argument is location of the input dataset
FileInputFormat.addInputPath(job, new Path(args[0]));

//first argument is the job itself
//second argument is the location of the output path
FileOutputFormat.setOutputPath(job, new Path(args[1]));


job.setJarByClass(CompositeKeyDriver.class);

job.setMapperClass(CompositeKeyMapper.class);

job.setReducerClass(CompositeKeyReducer.class);

job.setOutputKeyClass(Country.class);

job.setOutputValueClass(IntWritable.class);


//setting the second argument as a path in a path variable
Path outputPath = new Path(args[1]);

//deleting the output path automatically from hdfs so that we don't have delete it explicitly
outputPath.getFileSystem(conf).delete(outputPath);


System.exit(job.waitForCompletion(true) ? 0 : 1);
}


}

Mapper program:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


// First two parameters are Input Key and Input Value. Input Key = offset of each line (remember each line is a record). Input value = Line itself
// Second two parameters are Output Key and Output value of the Mapper. BTW, the outputs of the mapper are stored in the local file system and not on HDFS.
// Output Key = Country object is sent. Output Value = population in millions in that country + state combination


public class CompositeKeyMapper extends Mapper<LongWritable, Text, Country, IntWritable> {

/** The cntry. */
Country cntry = new Country();

/** The cnt text. */
Text cntText = new Text();

/** The state text. */
Text stateText = new Text();

//population in a Country + State
IntWritable populat = new IntWritable();

/**
*
* Reducer are optional in Map-Reduce if there is no Reducer defined in program then the output of the Mapper
* directly write to disk without sorting.
*
*/

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//Reader will give each record in a line to the Mapper.
//That line is split with the de-limiter ","
String line = value.toString();

String[] keyvalue = line.split(",");


//Country is the first item in the line in each record
cntText.set(new Text(keyvalue[0]));

//State is the second item in the line in each record
stateText.set(keyvalue[1]);

//This is the population. BTW, we can't send Java primitive datatypes into Context object. Java primitive data types are not effective in Serialization and De-serialization.
//So we have to use the equivalent Writable datatypes provided by mapreduce framework

populat.set(Integer.parseInt(keyvalue[3]));

//Here you are creating an object of Country class and in the constructor assigning the country name and state
Country cntry = new Country(cntText, stateText);

//Here you are passing the country object and their population to the context object.
//Remember that country object already implements "WritableComparable" interface which is equivalient to "Comparable" interface in Java. That implementation is in Country.java class
//Because it implements the WritableComparable interface, the Country objects can be sorted in the shuffle phase. If WritableComparable interface is not implemented, we
//can't sort the objects.

context.write(cntry, populat);

}
}


Reducer program:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


//Remember the two output parameters of the Mapper class will become the first two input parameters to the reducer class.

public class CompositeKeyReducer extends Reducer<Country, IntWritable, Country, IntWritable> {

// The first parameter to reduce method is "Country". The country object has country name and state name (look at the Country.java class for more details.
// The second parameter "values" is the collection of population for Country+State (this is a composite Key)

public void reduce(Country key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException {

int numberofelements = 0;

int cnt = 0;

while (values.hasNext()) {

cnt = cnt + values.next().get();

}

context.write(key, new IntWritable(cnt));

}

}

Answer

You're using the HashPartitioner so your Country class needs to implement the hashCode() method.

At the moment it will be using the default hashCode() implementation on Object which will result in your keys not grouping correctly.

Here's an example hashCode() method:

@Override
public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((country == null) ? 0 : country.hashCode());
    result = prime * result + ((state == null) ? 0 : state.hashCode());
    return result;
}

Additional information:

To be on the safe side you should set Text objects. At the moment you do this in your Country constructor.

public Country(Text country, Text state) {
    this.country = country;
    this.state = state;
}

You should change this to:

public Country(Text country, Text state) {
    this.country.set(country);
    this.state.set(state);
}
Comments