Balaji Reddy Balaji Reddy - 6 months ago 192
Java Question

How to Iterate DStream in Java

I'm new to Spark programming. I have a spark streaming program where it needs to store the received DStream into a database.I want to iterate my Dstream and store each record into Database.

something like this.

JavaStreamingContext streamingContext = getSparkStreamingContext();

JavaReceiverInputDStream<String> socketTextStream = streamingContext
.socketTextStream("localhost", 8080);

DStream<String> dstream = socketTextStream.dstream();

// Iterate each record from the DStream and push it to DB

Answer

You can use JavaDStream.foreachRDD and JavaRDD.foreach:

JavaStreamingContext streamingContext = getSparkStreamingContext();
JavaReceiverInputDStream<String> socketTextStream = streamingContext
        .socketTextStream("localhost", 8080);

socketTextStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
    @Override
    public void call(JavaRDD<String> rdd) throws Exception {
        rdd.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                // Save data
            }
        });
    }
});

Or using Java 8 Lambda Expressions:

JavaStreamingContext streamingContext = getSparkStreamingContext();
JavaReceiverInputDStream<String> socketTextStream = streamingContext
        .socketTextStream("localhost", 8080);

socketTextStream.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> {
    rdd.foreach((VoidFunction<String>) s -> {
        // Save data
    });
});

Edit

Since you're using Spark 1.2.0 (which is a bit old, might I suggest upgrading (current latest is 1.6.1, as of 22/05/2016)):

socketTextStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
    @Override
    public Void call(JavaRDD<String> rdd) throws Exception {
        rdd.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                // Save data
            }
        });
        return null;
    }
});