Azeem Akhter Azeem Akhter - 1 month ago 19
Python Question

join two rdds to make an adjacency list

I am new to spark, I have two rdds which I want to join to make an adjacency list

RDD1 (nodes): (a, b, c, d, e, f, g)

RDD2 (Edges): ((a,b), (a,e), (f, a), (k,l) ...)


now I want to join these two rdds to create an adjacency list like this

( (a,(b,e,..)), (b,(f,....), (g()) ,...)
#assuming that g is not connected to any node , also filter (k,l) because k and l are not in the nodes rdd


and also later on I need to find the count of nodes and edges in total.

Answer

So if I am understanding correctly you wish to have an adjacency list where the final RDD consists of key-values pairs with the key being the node and the value a list of it's edges. Perhaps something as shown below is what you had in mind? Though I believe if you want 'g' to be shown in your final RDD it would make sense to have it in your edges list as ('g', '') since you wish to convey that it has no edges.

To join we need to convert the nodes list to a pair RDD, so first we parallelize to create the RDD and then map a dummy value so that we have key-value pairs.

Now we can join the two RDDs to each other and the result will be only the keys that exist in both RDDs in this case 'a' and 'f'. Finally we strip the dummy value we added to the the nodes RDD and groupByKey to group our values together.

nodes = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
edges = [('a','b'), ('a','e'), ('f', 'a'), ('k','l')]
nodesRDD = sc.parallelize(nodes).map(lambda n: (n, ''))
edgesRDD = sc.parallelize(edges)
joinedRDD = nodesRDD.join(edgesRDD).map(lambda tup: (tup[0], tup[1][1]))
groupedRDD = joinedRDD.groupByKey()

groupedRDD.map(lambda x : (x[0], list(x[1]))).collect()

Out[146]: [('f', ['a']), ('a', ['b', 'e'])]

Count is similar but now we do not care about the actually node values, only their counts:

nodes = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
edges = [('a','b'), ('a','e'), ('f', 'a'), ('k','l')]
nodesRDD = sc.parallelize(nodes).map(lambda n: (n, 0))
edgesRDD = sc.parallelize(edges).map(lambda tup: (tup[0], 1))
joinedRDD = nodesRDD.join(edgesRDD).map(lambda tup: (tup[0], tup[1][1]))
reducedRDD = joinedRDD.reduceByKey(lambda a, b: a + b)

reducedRDD.collect()

Out[159]: [('f', 1), ('a', 2)]