user1872329 user1872329 - 1 month ago 15
Apache Configuration Question

Java hadoop wrong value class: class RatioCount$WritableArray is not class org.apache.hadoop.io.DoubleWritable

I am trying to learn hadoop. I have a text file where each line contain one traffic flow. Information is separated with commas. I want my map function to output a string which I build to identify one flow, something like this: "123.124.32.6 14.23.64.21 80 tcp" as a key and value some double (one number). I want my reduce function to output the same string as a key and as a value to take all values from all similar keys and to put them into an array. So I want something like this:
"123.124.32.6 14.23.64.21 80 tcp": [0.3 -0.1 1 -1 0.5]
as my final output.
When I run it I get an error:


Error: java.io.IOException: wrong value class: class
RatioCount$WritableArray is not class
org.apache.hadoop.io.DoubleWritable


Could you please point out my mistake and how to fix it?

Here is my code:

public class RatioCount {


public static class WritableArray extends ArrayWritable {

public WritableArray(Class<? extends Writable> valueClass, Writable[] values) {
super(valueClass, values);
}
public WritableArray(Class<? extends Writable> valueClass) {
super(valueClass);
}

@Override
public DoubleWritable[] get() {
return (DoubleWritable[]) super.get();
}

@Override
public void write(DataOutput arg0) throws IOException {
System.out.println("write method called");
super.write(arg0);
}
@Override
public String toString() {
return Arrays.toString(get());
}

}



public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf, "ratio count");

job.setJarByClass(RatioCount.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputValueClass(WritableArray.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}



public static class MyReducer
extends Reducer<Text, DoubleWritable, Text, WritableArray> {

private final IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
ArrayList<DoubleWritable> list = new ArrayList<DoubleWritable>();

for(DoubleWritable value :values){
list.add(value);
}
context.write(key, new WritableArray(DoubleWritable.class, list.toArray(new DoubleWritable[list.size()])));
}


}




public static class MyMapper extends Mapper<Object, Text, Text, DoubleWritable> {

private final Text word = new Text();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
if (value.toString().contains("StartTime")) {
return;
}
DoubleWritable ratio;
StringTokenizer(value.toString(),",");
String[] tokens = value.toString().split(",");
StringBuilder sb = new StringBuilder();
sb.append(tokens[2]);
sb.append(tokens[3]);
sb.append(tokens[6]);
sb.append(tokens[7]);
System.out.println(sb.toString());
word.set(sb.toString());
double sappbytes = Double.parseDouble(tokens[13]);
double totbytes = Double.parseDouble(tokens[14]);
double dappbytes = totbytes - sappbytes;

ratio = new DoubleWritable((sappbytes - dappbytes) / totbytes);
context.write(word, ratio);

}
}
}

Answer

Your problem is this line:

job.setCombinerClass(MyReducer.class);

A combiner must take in and emit the same types. In your case you have:

Reducer<Text, DoubleWritable, Text, WritableArray> which will be outputting an WritableArray but the following reduce expects DoubleWritable.

You should remove the combiner, or re-write it (as a separate class to your reducer) so that it takes in Text, DoubleWriteable and emits the same types.

Comments