zengr zengr - 1 month ago 17
Java Question

How to use data from Mongo and PostgreSQL as in-memory lookup tables?

Its a continuation of this question: Porting a multi-threaded compute intensive job to spark

I am using

forEachPartition
as suggested here to loop over a list of 10000 IDs, then I do a
repartition(20)
because every partition creates DB connection and if I create say 100 partitions, the job just dies because of 100 open connections to postgres and mongo. I use postgres connections not only to store data but to lookup some data from another table.
I can get rid of storing the data to postgres directly from my task and do it as post processing from a sequence file.

But I ideally need to massively parallelize my spark job so that the task completed within a given time, currently it processed about 200 IDs in 20hrs, but I need to process 10000 IDs in 20hrs. So
repartition(20)
is clearly not helping. I am bound by IO on db here.

So what are my options where I can efficiently share this data across all tasks? I want data in mongo and postgres to be treated as in memory lookup tables - total size is about 500gb.

My options are:


  1. RDD (I don't think RDD fits my usecase)

  2. Dataframe

  3. Broadcast variables (not sure of this will work as its creation needs 500gb available in the spark driver)

  4. Move data from mongo to s3 and tasks lookup from s3.


Answer

Old post here, but I ended up with this solution.

  1. Create JavaPairRDD<Long, YourType> where Long is the ID and YourTypeHere can be a list, object anything what data in postgres or mongo reflects for an ID.

  2. Say we end up with 3 data sources for an ID. Now we join all those 3 JavaPairRDDs and end up with a localized tuple. JavaRDD<Tuple4<Long, YourType1, YourType2, YourType3>>. This tuple we end up with is all I need, which will be passed to the simulation logic.

  3. Once we have our JavaRDD, we repartition(n) it where n=total number of partitions I can handle.

I didn't have to do in memory lookup by properly localizing data inside an RDD (step2). By removing any I/O (db calls) within distributed tasks (i.e. code running within map like operations in spark), I achieved expected execution time. I am not able to process about 200 IDs in an hour now (most time is taken in the actual simulation logic), data loading etc takes <10mins.