Gary Gary - 1 month ago 10
Java Question

How do I determine an offset in Apache Spark?

I'm searching through some data files (~20GB). I'd like to find some specific terms in that data and mark the offset for the matches. Is there a way to have Spark identify the offset for the chunk of data I'm operating on?

import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

import java.util.regex.*;

public class Grep {
public static void main( String args[] ) {
SparkConf conf = new SparkConf().setMaster( "spark://ourip:7077" );
JavaSparkContext jsc = new JavaSparkContext( conf );
JavaRDD<String> data = jsc.textFile( "hdfs://ourip/test/testdata.txt" ); // load the data from HDFS
JavaRDD<String> filterData = data.filter( new Function<String, Boolean>() {
// I'd like to do something here to get the offset in the original file of the string "babe ruth"
public Boolean call( String s ) { return s.toLowerCase().contains( "babe ruth" ); } // case insens matching

});

long matches = filterData.count(); // count the hits

// execute the RDD filter
System.out.println( "Lines with search terms: " + matches );
);
} // end main
} // end class Grep


I'd like to do something in the "filter" operation to compute the offset of "babe ruth" in the original file. I can get the offset of "babe ruth" in the current line, but what's the process or function that tells me the offset of the line within the file?

Answer

In Spark common Hadoop Input Format can be used. To read the byte offset from the file you can use class TextInputFormat from Hadoop (org.apache.hadoop.mapreduce.lib.input). It is already bundled with Spark.

It will read the file as key (byte offset) and value (text line):

An InputFormat for plain text files. Files are broken into lines. Either linefeed or carriage-return are used to signal end of line. Keys are the position in the file, and values are the line of text.

In Spark it can be used by calling newAPIHadoopFile()

SparkConf conf = new SparkConf().setMaster("");
JavaSparkContext jsc = new JavaSparkContext(conf);

// read the content of the file using Hadoop format
JavaPairRDD<LongWritable, Text> data = jsc.newAPIHadoopFile(
        "file_path", // input path
        TextInputFormat.class, // used input format class
        LongWritable.class, // class of the value
        Text.class, // class of the value
        new Configuration());    

JavaRDD<String> mapped = data.map(new Function<Tuple2<LongWritable, Text>, String>() {
    @Override
    public String call(Tuple2<LongWritable, Text> tuple) throws Exception {
        // you will get each line from as a tuple (offset, text)    
        long pos = tuple._1().get(); // extract offset
        String line = tuple._2().toString(); // extract text

        return pos + " " + line;
    }
});
Comments