Aviral Kumar Aviral Kumar - 1 month ago 16
Java Question

FlatMap function on a CoGrouped RDD

I am trying to use a flatmap function on the cogroupedRDD which has the signature:

JavaPairRDD<String, Tuple2<Iterable<Row>, Iterable<Row>>>


my flatmap function is as follows:

static FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row> setupF = new FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row>() {
@Override
public Iterable<Row> call(Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>> row) {
}};


But i am getting compilation error . I am sure it must be a syntactical issue which I am not able to understand.

Full Code:

JavaPairRDD<String, Tuple2<Iterable<Row>, Iterable<Row>>> coGroupedRDD = rdd1.cogroup(rdd2);
JavaRDD<Row> jd = coGroupedRDD.flatmap(setupF);
static FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row> setupF = new FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row>() {
@Override
public Iterable<Row> call(Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>> row) {
//logic
}};


Error:

The method flatmap(FlatMapFunction<Tuple2<String,Tuple2<Iterable<Row>,Iterable<Row>>>,Row>) is undefined for the type JavaPairRDD<String,Tuple2<Iterable<Row>,Iterable<Row>>>

Answer

A wild guess here, maybe the reason is that you write your code against Spark 1.6 API but you actually use Spark 2.0 dependency? API differs between these two releases.

Spark 1.6 API FlatMapFunction functional interface method signature:

Iterable<R> call(T t)

Spark 2.0 API FlatMapFunction method signature:

Iterator<R> call(T t) 

So try change you code to this:

new FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>, Row>() {
        @Override
        public Iterator<Row> call(Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>> row) {
        //...
        }
};

or using Java 8 lambda version:

   coGroupedRDD
   .flatMap(t -> {
        List<Row> result = new ArrayList<>();
        //...use t._1, t._2._1, t._2._2 to construct the result list
        return result.iterator();
    });