如何从多进程共享数据帧到主进程?
How to share datafram from multiprocess to main process?
这是我现在使用的第二版编码(来自Booboo),大约需要17分钟return查询结果,数据可以传输到专利进程。
from multiprocessing.pool import Pool
def do_query(query):
conn = cx_Oracle.connect("*", "*", "***", encoding="UTF-8")
cursor = conn.cursor()
sql = cursor.execute(query)
table = cursor.fetchall()
col = [x[0] for x in cursor.description]
return pd.DataFrame(table, columns=col)
if __name__ == '__main__':
print('start:',datetime.datetime.now())
queries = [
"select aaa from AAA",
"select bbb from BBB",
"select ccc from CCC",
"select ddd from DDD",
]
pool = Pool(len(queries))
dataframes = pool.map(do_query, queries)
pool.close()
pool.join()
print('end:',datetime.datetime.now())
下面的脚本是我的第一个版本编码(原始版本),我用4个子进程查询,需要4~6分钟才能完成查询,但没有将查询数据从子进程传输到parent.but 我可以在子流程中保存这些数据。
def aquery():
conn = cx_Oracle.connect("*", "*", "***", encoding="UTF-8")
cursor = conn.cursor()
sql = cursor.execute("select aaa from AAA")
aaa = cursor.fetchall()
col = [x[0] for x in cursor.description]
df_aaa = pd.DataFrame(aaa, columns=col)
def bquery():
conn = cx_Oracle.connect("*", "*", "***", encoding="UTF-8")
cursor = conn.cursor()
sql = cursor.execute("select bbb from BBB")
bbb = cursor.fetchall()
col = [x[0] for x in cursor.description]
df_bbb = pd.DataFrame(bbb, columns=col)
if __name__ == '__main__':
print('start:', datetime.datetime.now())
aaa_process = multiprocessing.Process(target=aaa)
bbb_process = multiprocessing.Process(target=bbb)
aaa_process.start()
bbb_process.start()
aaa_process.join()
bbb_process.join()
print('end:', datetime.datetime.now())
所以我想知道第 2 版编码是否因为需要时间将数据从子进程传输到父进程而花费更多时间?
我认为(至少我希望)你的例子只是对你真正想要完成的事情的简化,正如所展示的那样无法实现任何性能改进,因为你不是 有效地 进行任何多处理。这是因为尽管您的主进程创建了一个子进程,但它会立即阻塞,直到该子进程完成并 return 返回其结果,因此不会发生并行计算。您所做的只是通过创建一个新进程并且必须将数据从一个地址 space 传输到另一个地址来增加额外的开销。
因此 真实 用例 multiprocessing/multithreading 会,例如,涉及您必须执行 多个 的情况每个查询都需要从结果中生成一个数据框。如果我们要分析在这种情况下涉及什么,就会有查询本身,然后是数据框的创建。由于在创建新进程和跨进程地址 space 发送数据时存在上述开销,我的方法是对查询使用多线程,这应该会很好地工作,因为之间的竞争问题全局解释器锁的线程不应该应用,因为查询任务主要处于网络等待状态。然后,您可以 return 将查询结果返回到主线程(现在不需要 inter-process 传输)并让主线程执行 CPU 从returned 数据或更简单地使用相同的工作函数从结果构建数据框,return 完成数据框。在后一种情况下,根据 pandas
是否释放全局解释器锁,您可能无法在构建数据帧时实现任何级别的并行处理,但不会比让主进程构建数据框。
在pandas
不释放Global Interpreter Lock的worst-case场景下,你确实不会做更多的CPU-intensive工作,即创建数据帧,并行处理将建议自己作为解决方案。但是,只有当每个任务的 CPU 处理量足够大,以至于并行计算所获得的收益抵消了额外开销所损失的收益时,多处理的开销才变得值得。我不相信在这种特殊情况下会出现这种情况。
因此,以下是使用多线程池的示例代码,适用于您有两个查询来创建两个数据帧的情况:
from multiprocessing.pool import ThreadPool
def do_query(query):
conn = cx_Oracle.connect("*", "*","*", encoding="UTF-8")
cursor = conn.cursor()
sql = cursor.execute(query)
table = cursor.fetchall()
col = [x[0] for x in cursor.description]
return pd.DataFrame(table, columns=col)
if __name__ == '__main__':
queries = [
"select * from I where I.DATE between sysdate - 4 and sysdate - 3",
"select * from I where I.DATE between sysdate - 5 and sysdate - 4"
]
pool = ThreadPool(len(queries))
dataframes = pool.map(do_query, queries)
pool.close()
pool.join()
目前尚不清楚上述是否会提高串行代码 运行 两个连续查询的性能;这将取决于查询本身的复杂性(运行 时间)。
这是我现在使用的第二版编码(来自Booboo),大约需要17分钟return查询结果,数据可以传输到专利进程。
from multiprocessing.pool import Pool
def do_query(query):
conn = cx_Oracle.connect("*", "*", "***", encoding="UTF-8")
cursor = conn.cursor()
sql = cursor.execute(query)
table = cursor.fetchall()
col = [x[0] for x in cursor.description]
return pd.DataFrame(table, columns=col)
if __name__ == '__main__':
print('start:',datetime.datetime.now())
queries = [
"select aaa from AAA",
"select bbb from BBB",
"select ccc from CCC",
"select ddd from DDD",
]
pool = Pool(len(queries))
dataframes = pool.map(do_query, queries)
pool.close()
pool.join()
print('end:',datetime.datetime.now())
下面的脚本是我的第一个版本编码(原始版本),我用4个子进程查询,需要4~6分钟才能完成查询,但没有将查询数据从子进程传输到parent.but 我可以在子流程中保存这些数据。
def aquery():
conn = cx_Oracle.connect("*", "*", "***", encoding="UTF-8")
cursor = conn.cursor()
sql = cursor.execute("select aaa from AAA")
aaa = cursor.fetchall()
col = [x[0] for x in cursor.description]
df_aaa = pd.DataFrame(aaa, columns=col)
def bquery():
conn = cx_Oracle.connect("*", "*", "***", encoding="UTF-8")
cursor = conn.cursor()
sql = cursor.execute("select bbb from BBB")
bbb = cursor.fetchall()
col = [x[0] for x in cursor.description]
df_bbb = pd.DataFrame(bbb, columns=col)
if __name__ == '__main__':
print('start:', datetime.datetime.now())
aaa_process = multiprocessing.Process(target=aaa)
bbb_process = multiprocessing.Process(target=bbb)
aaa_process.start()
bbb_process.start()
aaa_process.join()
bbb_process.join()
print('end:', datetime.datetime.now())
所以我想知道第 2 版编码是否因为需要时间将数据从子进程传输到父进程而花费更多时间?
我认为(至少我希望)你的例子只是对你真正想要完成的事情的简化,正如所展示的那样无法实现任何性能改进,因为你不是 有效地 进行任何多处理。这是因为尽管您的主进程创建了一个子进程,但它会立即阻塞,直到该子进程完成并 return 返回其结果,因此不会发生并行计算。您所做的只是通过创建一个新进程并且必须将数据从一个地址 space 传输到另一个地址来增加额外的开销。
因此 真实 用例 multiprocessing/multithreading 会,例如,涉及您必须执行 多个 的情况每个查询都需要从结果中生成一个数据框。如果我们要分析在这种情况下涉及什么,就会有查询本身,然后是数据框的创建。由于在创建新进程和跨进程地址 space 发送数据时存在上述开销,我的方法是对查询使用多线程,这应该会很好地工作,因为之间的竞争问题全局解释器锁的线程不应该应用,因为查询任务主要处于网络等待状态。然后,您可以 return 将查询结果返回到主线程(现在不需要 inter-process 传输)并让主线程执行 CPU 从returned 数据或更简单地使用相同的工作函数从结果构建数据框,return 完成数据框。在后一种情况下,根据 pandas
是否释放全局解释器锁,您可能无法在构建数据帧时实现任何级别的并行处理,但不会比让主进程构建数据框。
在pandas
不释放Global Interpreter Lock的worst-case场景下,你确实不会做更多的CPU-intensive工作,即创建数据帧,并行处理将建议自己作为解决方案。但是,只有当每个任务的 CPU 处理量足够大,以至于并行计算所获得的收益抵消了额外开销所损失的收益时,多处理的开销才变得值得。我不相信在这种特殊情况下会出现这种情况。
因此,以下是使用多线程池的示例代码,适用于您有两个查询来创建两个数据帧的情况:
from multiprocessing.pool import ThreadPool
def do_query(query):
conn = cx_Oracle.connect("*", "*","*", encoding="UTF-8")
cursor = conn.cursor()
sql = cursor.execute(query)
table = cursor.fetchall()
col = [x[0] for x in cursor.description]
return pd.DataFrame(table, columns=col)
if __name__ == '__main__':
queries = [
"select * from I where I.DATE between sysdate - 4 and sysdate - 3",
"select * from I where I.DATE between sysdate - 5 and sysdate - 4"
]
pool = ThreadPool(len(queries))
dataframes = pool.map(do_query, queries)
pool.close()
pool.join()
目前尚不清楚上述是否会提高串行代码 运行 两个连续查询的性能;这将取决于查询本身的复杂性(运行 时间)。