LearningSlowly LearningSlowly - 1 month ago 15
Scala Question

GraphX - Weighted shortest path implementation - java.lang.NoSuchMethodError

Edit - I discovered that the book was written for scala

1.6
but the remainder is
2.11
.

I am trying to implement a weighted shortest path algorithm from Michael Malak and Robin East's
Spark GraphX in Action
book. The part in question is Listing 6.4 "Executing the shortest path algorithm that uses breadcrumbs" from Chapter 6 here.

I have my own graph that I create from two RDDs. There are
344436
vertices and
772983
edges. I can perform an unweighted shortest path computation using the native GraphX library and I'm confident in the graph construction.

In this case I use their Dijkstra's implementation as follows:

val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, edgesRDD).cache()

def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
var g2 = g.mapVertices(
(vid,vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]())
)

for (i <- 1L to g.vertices.count-1) {
val currentVertexId = g2.vertices
.filter(!_._2._1)
.fold((0L, (false, Double.MaxValue, List[VertexId]())))(
(a,b) => if (a._2._2 < b._2._2) a else b)
)
._1

val newDistances = g2.aggregateMessages[(Double, List[VertexId])](
ctx => if (ctx.srcId == currentVertexId) {
ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId))
},
(a,b) => if (a._1 < b._1) a else b
)

g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
val newSumVal = newSum.getOrElse((Double.MaxValue,List[VertexId]()))

(
vd._1 || vid == currentVertexId,
math.min(vd._2, newSumVal._1),
if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
)
})

}

g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
(vd, dist.getOrElse((false,Double.MaxValue,List[VertexId]()))
.productIterator.toList.tail
))
}

// Path Finding - random node from which to find all paths
val v1 = 4000000028222916L


I then call their function with my graph and a random vertex ID. Previously I had issues with
v1
not being recognised as
long
type and the
L
suffix solved this.

val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect

println(results)


However, this returns the following:

Error: Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at GraphX$.dijkstra$1(GraphX.scala:51)
at GraphX$.main(GraphX.scala:85)
at GraphX.main(GraphX.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Line 51 refers to the line
var g2 = g.mapVertices(

Line 85 refers to the line
val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect


What method is this exception referring to? I am able to package with
sbt
without error and I canno see what method I am calling whcih does not exist.

Answer

The issue wasn't in a version error nor a missing implementation but a misleading error from the compiler.

Ok so here is the thing: After investigating the code, I have noticed that the following section contained one extra closing parentheses :

val currentVertexId: VertexId = g2.vertices.filter(!_._2._1)
  .fold((0L, (false, Double.MaxValue, List[VertexId]())))(
    (a, b) => if (a._2._2 < b._2._2) a else b))._1 
                                              ^
                                              |

You'll just need to remove that extra parentheses and it will work perfectly. Here is the full code :

// scala> :pa
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.graphx._
def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = {
  var g2 = g.mapVertices(
(vid, vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]())
  )

  for (i <- 1L to g.vertices.count - 1) {
    val currentVertexId: VertexId = g2.vertices.filter(!_._2._1)
      .fold((0L, (false, Double.MaxValue, List[VertexId]())))(
        (a, b) => if (a._2._2 < b._2._2) a else b)._1

    val newDistances: VertexRDD[(Double, List[VertexId])] =
      g2.aggregateMessages[(Double, List[VertexId])](
    ctx => if (ctx.srcId == currentVertexId) {
      ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 :+ ctx.srcId))
    },
    (a, b) => if (a._1 < b._1) a else b
  )

g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
  val newSumVal = newSum.getOrElse((Double.MaxValue, List[VertexId]()))
  (
    vd._1 || vid == currentVertexId,
    math.min(vd._2, newSumVal._1),
    if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
    )
})
}

  g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
(vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]()))
  .productIterator.toList.tail
  ))
}

//  Path Finding - random node from which to find all paths

Now, let's test it :

val myVertices: RDD[(VertexId, String)] = sc.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"), (4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
val myEdges: RDD[Edge[Double]] = sc.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0), Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0), Edge(3L, 5L, 5.0), Edge(4L, 5L, 15.0), Edge(4L, 6L, 6.0),Edge(5L, 6L, 8.0), Edge(5L, 7L, 9.0), Edge(6L, 7L, 11.0)))

val my_graph = Graph(myVertices, myEdges).cache()

val v1 = 4000000028222916L

val results = dijkstra(my_graph, 1L).vertices.map(_._2).collect

// [CTRL-D]
// Exiting paste mode, now interpreting.
// [Lscala.Tuple2;@668a0785                                                        
// import org.apache.spark.graphx._
// myVertices: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, String)] = ParallelCollectionRDD[556] at makeRDD at <console>:37
// myEdges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = ParallelCollectionRDD[557] at makeRDD at <console>:39
// my_graph: org.apache.spark.graphx.Graph[String,Double] = org.apache.spark.graphx.impl.GraphImpl@49ea0d90
// dijkstra: [VD](g: org.apache.spark.graphx.Graph[VD,Double], origin: org.apache.spark.graphx.VertexId)org.apache.spark.graphx.Graph[(VD, List[Any]),Double]
// v1: Long = 4000000028222916
// results: Array[(String, List[Any])] = Array((A,List(0.0, List())), (B,List(7.0, List(1))), (C,List(15.0, Li...
scala> results.foreach(println)
// (A,List(0.0, List()))
// (B,List(7.0, List(1)))
// (C,List(15.0, List(1, 2)))
// (D,List(5.0, List(1)))
// (E,List(14.0, List(1, 2)))
// (F,List(11.0, List(1, 4)))
// (G,List(22.0, List(1, 4, 6)))