HackerDuck HackerDuck - 3 months ago 11
Scala Question

Getting List[Serializable] instead of List[(String,String)]

My initial RDD looks as follows

RDD[(String, List[(String,String)])]
:

(600,List((22,33),(55,88)))
(700,List((12,13),(15,18),(18,88)))


I want to append each entry with additional data obtained from Redis cache DB. To do this, I use
Sedis
which is a wrapper of
Jedis
for Scala. This is a fragment of my code:

import org.sedis._
import redis.clients.jedis._

val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))

val appended = filtered.map({line => (line._1,
redisPool.withJedisClient { client =>
val additionalData: List[String] = Dress.up(client).hvals("member_id:"+line._1)
line._2.union(additionalData)
})
})


The problem is that
appended
is of the format
RDD[(String, List[Serializable]
instead of
RDD[(String, List[(String,String)])]
. What am I doing wrong?
Also, is the way in which I am using
redisPool
inside
map
efficient enough or is there any other better option?

Answer
  1. line._2.union(additionalData) creates a union of line._2 which has the type List[(String, String)] and of additionalData which has the type List[Sting]. The result must be the most accurate common type of these two different types - which is List[Serializable]. If additionalData had the type List[(String, String)], that would have been the result type.

  2. As for efficiency of JedisPool usage: usually, when opening connection to some external resource from a Spark transformation, you should use mapPartitions, which executes the given function on each of the RDD's partitions. Why? Under your current implementation, the pool is created on the driver application, then serialized and shipped to each executor, to be deserialized and used in the mapping. This usually fails, because such a pool holds some kind of connection (open socket maybe) that doesn't exist on the executors, only on the driver where it was created. One (inefficient) alternative would be to create the pool inside the map function (per record). The better option is to use mapPartitions:

    val appended = filtered.mapPartitions(iter => {
      val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), "jedis-host", 6379, 2000))
    
      iter.map({line => (line._1,
        redisPool.withJedisClient { client =>
        val additionalData: List[String] = Dress.up(client).hvals("member_id:"+line._1)
        line._2.union(additionalData)
        })
      })
      // close the pool, if relevant
    })