Michael Lihs Michael Lihs - 26 days ago 20
Scala Question

Spark and Java: Exception thrown in awaitResult

I am trying to connect a Spark cluster running within a virtual machine with IP

10.20.30.50
and port
7077
from within a Java application and run the word count example:

SparkConf conf = new SparkConf().setMaster("spark://10.20.30.50:7077").setAppName("wordCount");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> textFile = sc.textFile("hdfs://localhost:8020/README.md");
String result = Long.toString(textFile.count());
JavaRDD<String> words = textFile.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator());
JavaPairRDD<String, Integer> pairs = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((Function2<Integer, Integer, Integer>) (a, b) -> a + b);
counts.saveAsTextFile("hdfs://localhost:8020/tmp/output");
sc.stop();
return result;


The Java application shows the following stack trace:

Running Spark version 2.0.1
Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Changing view acls to: lii5ka
Changing modify acls to: lii5ka
Changing view acls groups to:
Changing modify acls groups to:
SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(lii5ka); groups with view permissions: Set(); users with modify permissions: Set(lii5ka); groups with modify permissions: Set()
Successfully started service 'sparkDriver' on port 61267.
Registering MapOutputTracker
Registering BlockManagerMaster
Created local directory at /private/var/folders/4k/h0sl02993_99bzt0dzv759000000gn/T/blockmgr-51de868d-3ba7-40be-8c53-f881f97ced63
MemoryStore started with capacity 2004.6 MB
Registering OutputCommitCoordinator
Logging initialized @48403ms
jetty-9.2.z-SNAPSHOT
Started o.s.j.s.ServletContextHandler@1316e7ec{/jobs,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@782de006{/jobs/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@2d0353{/jobs/job,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@381e24a0{/jobs/job/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@1c138dc8{/stages,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@b29739c{/stages/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@63f6de31{/stages/stage,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@2a04ddcb{/stages/stage/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@2af9688e{/stages/pool,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@6a0c5bde{/stages/pool/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@3f5e17f8{/storage,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@33b86f5d{/storage/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@5264dcbc{/storage/rdd,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@5a3ebf85{/storage/rdd/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@159082ed{/environment,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@6522c585{/environment/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@115774a1{/executors,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@3e3a3399{/executors/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@2f2c5959{/executors/threadDump,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@5c51afd4{/executors/threadDump/json,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@76893a83{/static,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@19c07930{/,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@54eb0dc0{/api,null,AVAILABLE}
Started o.s.j.s.ServletContextHandler@5953786{/stages/stage/kill,null,AVAILABLE}
Started ServerConnector@2eeb8bd6{HTTP/1.1}{0.0.0.0:4040}
Started @48698ms
Successfully started service 'SparkUI' on port 4040.
Bound SparkUI to 0.0.0.0, and started at http://192.168.0.104:4040
Connecting to master spark://10.20.30.50:7077...
Successfully created connection to /10.20.30.50:7077 after 25 ms (0 ms spent in bootstraps)
Connecting to master spark://10.20.30.50:7077...
Still have 2 requests outstanding when connection from /10.20.30.50:7077 is closed
Failed to connect to master 10.20.30.50:7077

org.apache.spark.SparkException: Exception thrown in awaitResult
at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) ~[spark-core_2.11-2.0.1.jar:2.0.1]
at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) ~[spark-core_2.11-2.0.1.jar:2.0.1]
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[scala-library-2.11.8.jar:na]
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) ~[spark-core_2.11-2.0.1.jar:2.0.1]
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) ~[spark-core_2.11-2.0.1.jar:2.0.1]
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) ~[scala-library-2.11.8.jar:na]
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) ~[spark-core_2.11-2.0.1.jar:2.0.1]
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88) ~[spark-core_2.11-2.0.1.jar:2.0.1]
at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96) ~[spark-core_2.11-2.0.1.jar:2.0.1]
at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106) ~[spark-core_2.11-2.0.1.jar:2.0.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_102]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_102]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_102]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_102]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_102]
Caused by: java.io.IOException: Connection from /10.20.30.50:7077 closed
at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128) ~[spark-network-common_2.11-2.0.1.jar:2.0.1]
at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:109) ~[spark-network-common_2.11-2.0.1.jar:2.0.1]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:257) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182) ~[spark-network-common_2.11-2.0.1.jar:2.0.1]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:828) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:621) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[netty-all-4.0.29.Final.jar:4.0.29.Final]
... 1 common frames omitted


In the Spark Master log on
10.20.30.50
, I get the following error message:

16/11/05 14:47:20 ERROR OneForOneStrategy: Error while decoding incoming Akka PDU of length: 1298
akka.remote.transport.AkkaProtocolException: Error while decoding incoming Akka PDU of length: 1298
Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed.
at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:167)
at akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:580)
at akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:375)
at akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:343)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at akka.actor.FSM$class.processEvent(FSM.scala:604)
at akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:269)
at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:598)
at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:592)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:269)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:89)
at com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:108)
at akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:6643)
at akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:6607)
at akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:6703)
at akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:6698)
at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
at akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:6821)
at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:168)
... 19 more


Additional Information


  • The example works fine when I use
    new SparkConf().setMaster("local")
    instead

  • I can connect to the Spark Master with
    spark-shell --master spark://10.20.30.50:7077
    on the very same machine


Answer

Looks like network error in the first place (but actually NOT) in the disguise of version mismatch of spark . You can point to correct version of spark jars mostly assembly jars.

This issue may happen due to version miss match in Hadoop RPC call using Protobuffer.

when a protocol message being parsed is invalid in some way, e.g. it contains a malformed varint or a negative byte length.

  • My experience with protobuf, InvalidProtocolBufferException can happen, only when the message was not able to parse(programatically if you are parsing protobuf message, may be message legth is zero or message is corrupted...).

  • Spark uses Akka Actors for Message Passing between Master/Driver and Workers and Internally akka uses googles protobuf to communicate. see method below from AkkaPduCodec.scala)

    override def decodePdu(raw: ByteString): AkkaPdu = {
        try {
          val pdu = AkkaProtocolMessage.parseFrom(raw.toArray)
          if (pdu.hasPayload) Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer()))
          else if (pdu.hasInstruction) decodeControlPdu(pdu.getInstruction)
          else throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained", null)
        } catch {
          case e: InvalidProtocolBufferException ⇒ throw new PduCodecException("Decoding PDU failed.", e)
        }
      }
    

But in your case, since its version mismatch, new protobuf version message cant be parsed from old version of parser... or something like...

If you are using maven other dependencies pls. review.

Comments