nickpick nickpick - 1 month ago 7
Python Question

pool.apply_async with multiple parameters

The below code should call two databases at the same time. I tried to do it with
ThreadPool but run into some difficulties. pool.apply_async doesn't seem to allow multiple parameters, so I put them into a tuple and then try to unpack them. Is this the right approach or is there a better solution?

The list of tuples is defined in params=... and the tuples have 3 entries. I would expect the function to be called twice, each time with 3 parameters.

def get_sql(self, *params): # run with risk
self.logger.info(len(params))
sql=params[0]
schema=params[1]
db=params[2]
self.logger.info("Running SQL with schema: {0}".format(schema))
df = pd.read_sql(sql, db)
return df

def compare_prod_uat(self):
self.connect_dbrs_prod_db()
self.connect_dbrs_uat_db()
self.logger.info("connected to UAT and PROD database")

sql = """ SELECT * FROM TABLE """

params = [(sql, "DF_RISK_PRD_OWNER", self.db_dbrs_prod), (sql, "DF_RISK_CUAT_OWNER", self.db_dbrs_uat)]
pool = ThreadPool(processes=2)
self.logger.info("Calling Pool")
result_prod = pool.apply_async(self.get_sql, (sql, "DF_RISK_PRD_OWNER", self.db_dbrs_prod))
result_uat = pool.apply_async(self.get_sql, (sql, "DF_RISK_CUAT_OWNER", self.db_dbrs_uat))

# df_prod = self.get_sql(sql, "DF_RISK_PRD_OWNER", self.db_dbrs_prod)
# df_cuat = self.get_sql(sql, "DF_RISK_CUAT_OWNER", self.db_dbrs_uat)


self.logger.info("Get return from uat")
df1 = result_uat.get() # get return value from the database call

self.logger.info("Get return from prod")
df2 = result_prod.get() # get second return value from the database call


return df1, df2

Answer

There may be many things wrong, but if you add

print params

as the first line of your get_sql, you'll see that you send in a tuple (sql, [(sql, "DF_RISK_PRD_OWNER", self.db_dbrs_prod), (sql, .....)])

So yes, length of params is always two, the first parameter being "sql" whatever that is in your implementation, and the second being an array of tuples of length three. I don't understand why you are sending (sql,params) instead of just (params,) as "sql" seems to be there in the array elements. If it needs to be there, your array is in params[1].

However, I don't understand how your worker function would traverse this array. It seems to be built to execute only one sql statement as it doesn't have a for loop. Maybe you intended to do the for loop in your compare_prod_uat function and spawn as many workers as you have elements in your array? I don't know but it currently doesn't make much sense.

The parameter issue can be fixed by this, though.

Comments