Steve Steve - 3 months ago 25
Java Question

Custom word count in java map reduce

I'd like to count the words but only from a certain column.

My input file looks like this:


Id, EmployeeName, JobTitle, BasePay, OvertimePay, OtherPay, Benefits, TotalPay, TotalPayBenefits, Year, Notes, Agency, Status


I only want to count the job titles, but I really don't know how to do it. My main idea was to split the text and store the column into an array and then use a basic word count. I guess it's not gonna work because the map reads the text line by line.

This is what I tried to do:

import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
import java.util.*;
import org.apache.commons.lang3.*;

public class WorkersbyJobtitle extends Configured implements Tool {
public static class MapClass extendsMapper<LongWritable,Text,Text,IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text jobtitle = new Text();

public void map(LongWritable key, Text value, Context context) {
try {
String[] str = value.toString().split(",", -2);
String jobtitletmp=str[2];
StringTokenizer tokenizer = new StringTokenizer(jobtitletmp);
while (tokenizer.hasMoreTokens()) {
jobtitle.set(tokenizer.nextToken());
context.write(jobtitle, one);
}
}
catch(Exception e) {
System.out.println(e.getMessage());
}
}
}


public static class ReduceClass extends Reducer<Text,IntWritable,Text,IntWritable> {
public void reduce(Text key, Iterable <IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;

for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}


This is the first five line of my input file:


1,NATHANIEL FORD,GENERAL MANAGER-METROPOLITAN TRANSIT AUTHORITY,167411.18,0.0,400184.25,,567595.43,567595.43,2011,,San Francisco,

2,GARY JIMENEZ,CAPTAIN III (POLICE DEPARTMENT),155966.02,245131.88,137811.38,,538909.28,538909.28,2011,,San Francisco,

3,ALBERT PARDINI,CAPTAIN III (POLICE DEPARTMENT),212739.13,106088.18,16452.6,,335279.91,335279.91,2011,,San Francisco,

4,CHRISTOPHER CHONG,WIRE ROPE CABLE MAINTENANCE MECHANIC,77916.0,56120.71,198306.9,,332343.61,332343.61,2011,,San Francisco,

5,PATRICK GARDNER,DEPUTY CHIEF OF DEPARTMENT(FIRE DEPARTMENT),134401.6,9737.0,182234.59,,326373.19,326373.19,2011,,San Francisco,

Answer

In input file, each record has only one job title which is being stored in

String jobtitletmp=str[2];

All you need to do is, instead of looping in on tokenizer, just send this String as key.

i.e, replace

StringTokenizer tokenizer = new StringTokenizer(jobtitletmp);
while (tokenizer.hasMoreTokens()) {
    jobtitle.set(tokenizer.nextToken());
    context.write(jobtitle, one);
}

with

// StringTokenizer tokenizer = new StringTokenizer(jobtitletmp);
// while (tokenizer.hasMoreTokens()) {
//    jobtitle.set(tokenizer.nextToken());
    context.write(new Text(jobtitletmp), one);
//}<BR><BR>

EDIT: Full code below :

  public class WorkersbyJobtitle extends Configured implements Tool {
    public static class MapClass extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
            public void map(LongWritable key, Text value, Context context) {
            try {
                String[] str = value.toString().split(",");
                context.write(new Text(str[2]), one);
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
        }
    }

    public static class ReduceClass extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;

            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        Configuration conf = getConf();
        Job j = new Job(conf, "Test");
        j.setJarByClass(WorkersbyJobtitle.class);
        j.setMapperClass(MapClass.class);
        j.setReducerClass(ReduceClass.class);
        j.setOutputKeyClass(Text.class);
        j.setOutputValueClass(IntWritable.class);

        String input = args[0];
        String output = args[1];

        Path inpath = new Path(input);
        Path outpath = new Path(output);
        FileInputFormat.setInputPaths(j, inpath);

        FileOutputFormat.setOutputPath(j, outpath);
        outpath.getFileSystem(conf).delete(outpath);

        j.setNumReduceTasks(1);

        j.setInputFormatClass(TextInputFormat.class);
        j.setOutputFormatClass(TextOutputFormat.class);
        return j.waitForCompletion(true) ? 0 : 1;

    }

    public static void main(String args[]) throws Exception {
        int rec = ToolRunner.run(new Configuration(), new WorkersbyJobtitle(),
                args);
        System.exit(rec);
    }
  }

output:


C:\Installs\hadoop-2.3.0>hadoop fs -cat /user/JokeR/output/*

CAPTAIN III (POLICE DEPARTMENT) 2

DEPUTY CHIEF OF DEPARTMENT(FIRE DEPARTMENT) 1

GENERAL MANAGER-METROPOLITAN TRANSIT AUTHORITY 1

WIRE ROPE CABLE MAINTENANCE MECHANIC 1

Comments