Stéphanie C Stéphanie C - 22 days ago 6x
Scala Question

How to create a graph from Array[(Any, Any)] using Graph.fromEdgeTuples

I am very new to spark but I want to create a graph from relations that I get from a Hive table. I found a function that is supposed to allow this without defining the vertices but I can't get it to work.

I know this isn't a reproducible example but here is my code :

import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc)
val data = sqlContext.sql("select year, trade_flow, reporter_iso, partner_iso, sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year, trade_flow, reporter_iso, partner_iso").collect()
val data_2010 = data.filter(line => line(0)==2010)
val couples =>(line(2),line(3)) //country to country

val graph = Graph.fromEdgeTuples(couples, 1)

The last line generates the following error :

val graph = Graph.fromEdgeTuples(sc.parallelize(couples), 1)
<console>:31: error: type mismatch;
found : Array[(Any, Any)]
required: Seq[(org.apache.spark.graphx.VertexId,org.apache.spark.graphx.VertexId)]
Error occurred in an application involving default arguments.
val graph = Graph.fromEdgeTuples(sc.parallelize(couples), 1)

couples look like this :


How can I convert to the suitable format ?


First of all you cannot use String as a VertexId so you have to map labels to Long. Then, we need to prepare a mapping from label to id. As long as the number of unique values is relatively small, the simplest approach is to create a broadcast variable:

val idMap = sc.broadcast(couples // -> Array[(Any, Any)]
  // Make sure we use String not Any returned from Row.apply
  // And convert to Seq so we can flatten results
  .flatMap{case (x: String, y: String) => Seq(x, y)} // -> Array[String]
  // Get different keys
  .distinct // -> Array[String]
  // Create (key, value) pairs
  .zipWithIndex  // -> Array[(String, Int)]
  // Convert values to Long so we can use it as a VertexId
  .map{case (k, v) => (k, v.toLong)}  // -> Array[(String, Long)]
  // Create map
  .toMap) // -> Map[String,Long]

Next we can use the above to perform mapping:

val edges: RDD[(VertexId, VertexId)] = sc.parallelize(couples
  .map{case (x: String, y: String) => (idMap.value(x), idMap.value(y))}

Finally we get a graph:

val graph = Graph.fromEdgeTuples(edges, 1)