pool.apply_async 有多个参数
pool.apply_async with multiple parameters
下面的代码应该同时调用两个数据库。我试着用
但是运行进入ThreadPool有些困难。 pool.apply_async 好像不允许多个参数,所以我把它们放在一个元组中,然后尝试解包。这是正确的方法还是有更好的解决方案?
元组列表在 params=... 中定义,元组有 3 个条目。我希望函数被调用两次,每次有 3 个参数。
def get_sql(self, *params): # run with risk
self.logger.info(len(params))
sql=params[0]
schema=params[1]
db=params[2]
self.logger.info("Running SQL with schema: {0}".format(schema))
df = pd.read_sql(sql, db)
return df
def compare_prod_uat(self):
self.connect_dbrs_prod_db()
self.connect_dbrs_uat_db()
self.logger.info("connected to UAT and PROD database")
sql = """ SELECT * FROM TABLE """
params = [(sql, "DF_RISK_PRD_OWNER", self.db_dbrs_prod), (sql, "DF_RISK_CUAT_OWNER", self.db_dbrs_uat)]
pool = ThreadPool(processes=2)
self.logger.info("Calling Pool")
result_prod = pool.apply_async(self.get_sql, (sql, "DF_RISK_PRD_OWNER", self.db_dbrs_prod))
result_uat = pool.apply_async(self.get_sql, (sql, "DF_RISK_CUAT_OWNER", self.db_dbrs_uat))
# df_prod = self.get_sql(sql, "DF_RISK_PRD_OWNER", self.db_dbrs_prod)
# df_cuat = self.get_sql(sql, "DF_RISK_CUAT_OWNER", self.db_dbrs_uat)
self.logger.info("Get return from uat")
df1 = result_uat.get() # get return value from the database call
self.logger.info("Get return from prod")
df2 = result_prod.get() # get second return value from the database call
return df1, df2
可能有很多地方不对,但是如果加上
print params
作为 get_sql 的第一行,您会看到您发送了一个元组 (sql, [(sql, "DF_RISK_PRD_OWNER", self.db_dbrs_prod), (sql, .....)])
所以是的,params 的长度总是两个,第一个参数是 "sql" 无论你的实现是什么,第二个是长度为 3 的元组数组。我不明白你为什么要发送 (sql,params) 而不是 (params,) 因为 "sql" 似乎存在于数组元素中。如果它需要在那里,你的数组在 params[1].
但是,我不明白你的辅助函数是如何遍历这个数组的。它似乎只执行一个 sql 语句,因为它没有 for 循环。也许您打算在 compare_prod_uat 函数中执行 for 循环并生成与数组中的元素一样多的工人?我不知道,但目前没有多大意义。
参数问题可以通过这个解决。
下面的代码应该同时调用两个数据库。我试着用 但是运行进入ThreadPool有些困难。 pool.apply_async 好像不允许多个参数,所以我把它们放在一个元组中,然后尝试解包。这是正确的方法还是有更好的解决方案?
元组列表在 params=... 中定义,元组有 3 个条目。我希望函数被调用两次,每次有 3 个参数。
def get_sql(self, *params): # run with risk
self.logger.info(len(params))
sql=params[0]
schema=params[1]
db=params[2]
self.logger.info("Running SQL with schema: {0}".format(schema))
df = pd.read_sql(sql, db)
return df
def compare_prod_uat(self):
self.connect_dbrs_prod_db()
self.connect_dbrs_uat_db()
self.logger.info("connected to UAT and PROD database")
sql = """ SELECT * FROM TABLE """
params = [(sql, "DF_RISK_PRD_OWNER", self.db_dbrs_prod), (sql, "DF_RISK_CUAT_OWNER", self.db_dbrs_uat)]
pool = ThreadPool(processes=2)
self.logger.info("Calling Pool")
result_prod = pool.apply_async(self.get_sql, (sql, "DF_RISK_PRD_OWNER", self.db_dbrs_prod))
result_uat = pool.apply_async(self.get_sql, (sql, "DF_RISK_CUAT_OWNER", self.db_dbrs_uat))
# df_prod = self.get_sql(sql, "DF_RISK_PRD_OWNER", self.db_dbrs_prod)
# df_cuat = self.get_sql(sql, "DF_RISK_CUAT_OWNER", self.db_dbrs_uat)
self.logger.info("Get return from uat")
df1 = result_uat.get() # get return value from the database call
self.logger.info("Get return from prod")
df2 = result_prod.get() # get second return value from the database call
return df1, df2
可能有很多地方不对,但是如果加上
print params
作为 get_sql 的第一行,您会看到您发送了一个元组 (sql, [(sql, "DF_RISK_PRD_OWNER", self.db_dbrs_prod), (sql, .....)])
所以是的,params 的长度总是两个,第一个参数是 "sql" 无论你的实现是什么,第二个是长度为 3 的元组数组。我不明白你为什么要发送 (sql,params) 而不是 (params,) 因为 "sql" 似乎存在于数组元素中。如果它需要在那里,你的数组在 params[1].
但是,我不明白你的辅助函数是如何遍历这个数组的。它似乎只执行一个 sql 语句,因为它没有 for 循环。也许您打算在 compare_prod_uat 函数中执行 for 循环并生成与数组中的元素一样多的工人?我不知道,但目前没有多大意义。
参数问题可以通过这个解决。