nikk nikk - 3 months ago 25
Scala Question

Adding custom code to Hadoop/Spark -- compression codec

When dealing with data compression, Spark supports the various compression schemes within the underlying Hadoop infrastructure. E.g Snappy (default), LZ4, LZF, GZIP.

How may one specify that a different user-built, custom codec order than the existing codecs be used. Say for instance, my codec is called DUMB. How can I use DUMB instead of the default Snappy. I looked into the the CompressionCodecFactory class (https://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/io/compress/CompressionCodecFactory.html), but still don't quite understand how to get things wired up. Has anyone done something similar to this before, or have any hints?




EDIT: based on answer by @Paweł_Jurczenko, I have added more detail.

Here is DUMB:

public class Dumb{

public Dumb() {

}

public int CompressIt(InBuffers inBuffs) {
return CallCompressor(inBuffs);
}
}


*InBuffers is a class that implements a List of buffers to be compressed, and CallCompressor does the dirty, underground work.

To use DUMB in the main class, I would do:

Dumb myDumbComp = new Dumb();
myDumbComp.CompressIt(inBuffs) //inBuffs is a List of individual input buffers


But now, I want to have a standard CompressionCodec interface through which I can invoke the method in Hadoop.

Answer

First of all, your custom codec should implement CompressionCodec interface. Then you should set spark.io.compression.codec property to the fully qualified class name of your codec, e.g.:

val sparkConf: SparkConf = new SparkConf()
  .setAppName("...")
  .set("spark.io.compression.codec", "com.organization.compress.CustomCodec")
val sc: SparkContext = new SparkContext(sparkConf)

From now on, your CustomCodec will be used during the compression of internal Spark data, like RDD partitions, broadcast variables and shuffle outputs. Of course your codec must be present in the classpath. If you'd like to compress your output data as well, you should set the following properties of hadoopConfiguration (which is a part of SparkContext):

sc.hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress", "true")
sc.hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress.codec", "com.organization.compress.CustomCodec")
sc.hadoopConfiguration.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK")