alukard990 alukard990 - 26 days ago 9
Scala Question

Scala-Spark: Convert Dataframe to RDD[Edge]

I have a dataframe that it represents edges of a graph; this is the schema:

root |-- src: string (nullable = true)
|-- dst: string (nullable = true)
|-- relationship: struct (nullable = false)
| |-- business_id: string (nullable = true)
| |-- normalized_influence: double (nullable = true)


I want to convert it to RDD[Edge] to work with Pregel API and my difficulties is on attribute "relationship". How can convert it?

Answer Source

Edge is a parameterized class. This means you can store whatever you like in each edge, besides the source and destination ids. In your case, it would probably be an Edge[Relationship]. You can use case classes to map both the dataframe and the RDD[Edge[Relationship]]:

import scala.util.hashing.MurmurHash3
case class Relationship(business_id: String, normalized_influence: Double)
case class MyEdge(src: String, dst: String, relationship: Relationship)

val edges: RDD[Edge[Relationship]] = df.as[MyEdge].rdd.map { edge =>
    Edge(
        MurmurHash3.stringHash(edge.src).toLong, // VertexId type is a Long, so we need to hash your string
        MurmurHash3.stringHash(edge.dst).toLong,
        edge.relationship
    )
}