如何在 Python 中 运行 串行函数中的并行函数?
How to run a parallel function inside a serial one in Python?
也许这真的很简单,但我在理解这一点时遇到了一些问题。
我面临的挑战是从 mother 函数内部执行子并行函数。在等待子并行函数调用的结果时,mother 函数应该 运行 只有一次。
我写了一个小例子来说明我的困境。
import string
from joblib import Parallel, delayed
import multiprocessing
def jobToDoById(id):
#do some other logic based on the ID given
rand_str = ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for i in range(10))
return [id, rand_str]
def childFunctionParallel(jobs):
num_cores = multiprocessing.cpu_count()
num_cores = num_cores - 1
if __name__ == '__main__':
p = Parallel(n_jobs=num_cores)(delayed(jobToDoById)(i) for i in jobs)
return p
def childFunctionSerial(jobs):
result = []
for job in jobs:
job_result = jobToDoById(job)
result.append(job_result)
return result
def motherFunction(countries_cities, doInParallel):
result = []
print("Start mainLogic")
for country in countries_cities:
city_list = countries_cities[country]
if(doInParallel):
cities_result = childFunctionParallel(city_list)
else:
cities_result = childFunctionSerial(city_list)
result.append(cities_result)
# ..... do some more logic
# ..... do some more logic before returning
print("End mainLogic")
return result
print("Start Program")
countries_cities = {
"United States" : ["Alabama", "Hawaii", "Mississippi", "Pennsylvania"],
"United Kingdom" : ["Cambridge", "Coventry", "Gloucester", "Nottingham"],
"France" : ["Marseille", "Paris", "Saint-Denis", "Nanterre", "Aubervilliers"],
"Denmark" : ["Aarhus", "Slagelse", "Nykøbing F", "Rønne", "Odense"],
"Australia" : ["Sydney", "Townsville", "Bendigo", "Bathurst", "Busselton"],
}
result_mother = motherFunction(countries_cities, doInParallel=True) # should be executed only once
print(result_mother)
print("End Program")
如果您在 True
和 False
之间切换 doInParallel
,那么您可以看到问题所在。当运行 childFunctionSerial()
motherFunction()
运行时只有一次。但是当我们 运行 和 childFunctionParallel
时, motherFunction()
会被执行多次。两者都给出相同的结果,但我遇到的问题是 motherFunction()
应该只执行一次。
两个问题:
1.如何重构程序使我们执行一次母函数
并从其中开始并行作业,而不 运行 宁同一个母函数的多个实例?
2. 除了 id
之外,如何将第二个参数传递给 jobToDoById()
?
广告 2:将附加参数放入元组并传递 ( id, .., )
这个比较简单,也比较常用,很多例子都能遇到。
def jobToDoById( aTupleOfPARAMs = ( -1, ) ): # jobToDoById(id):
# # do some other logic based on the ID given
if not type( aTupleOfPARAMs ) is tuple: # FUSE PROTECTION
return [-1, "call interface violated"]
if aTupleOfPARAMs[0] == -1: # FUSE PROTECTION
return [-1, None]
# .......................................# GO GET PROCESSED:
rand_str = ''.join( random.choice( string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits
)
for i in range( 10 )
)
return [id, rand_str]
第一个问题有点难,但更有趣的是 system-design 的 principal differences among [SERIAL]
, "just"-[CONCURRENT]
and true-[PARALLEL]
system-scheduling policies of more than one processes 在大众媒体中并不总是受到尊重(有时甚至在学术界也不被尊重)。
广告 1:您可能会感到惊讶,这在当前版本中绝不会发生
您的代码明确提到了 joblib.Parallel
和 multiprocessing
模块,但文档说:
By default Parallel
uses the Python multiprocessing
module to fork separate Python worker processes to execute tasks concurrently on separate CPUs. This is a reasonable default for generic Python programs but it induces some overhead as the input and output data need to be serialized in a queue for communication with the worker processes.
有两个含义 - 您的处理将 支付双重费用,[TIME]
-domain and [SPACE]
-domain overhead costs, that may easily become unacceptably huge OVERHEAD COSTS ( and if one has already noticed also the words "data" and "serialized" in the citation above, the better ) - for details see re-formulated Amdahl's Law, as detailed in Section: Criticism et al parallelism-amdahl:
1) 整个 Python 解释器 包括它的数据和内部状态是完全分叉的(所以你得到尽可能多的副本,每个 运行只有一个 process-flow,这是为了不在 GIL-round-robin 碎片上失去性能 / 只有 1-runs-All-Others-have-to-wait 类型的 GIL-blocking / 步进存在任何 1+ processing-flow 如果在 threading-based 池等中制作。)
2) 除了所有必须如上所述发生的完整 Python 解释器 + 状态 re-instantiations 之外,还有 ALL <data-IN>
+ <data-OUT>
是:
----------------------------MAIN-starts-to-escape-from-pure-[SERIAL]-processing--
0: MAIN forks self
[1]
[2]
...
[n_jobs] - as many copies of self as requested
-------------------------MAIN-can-continue-in-"just"-[CONCURRENT]-after:
1st-Data-IN-SERialised-in-MAIN's-"__main__"
+ 2nd-Data-IN-QUEueed in MAIN
+ 3rd-Data-IN-DEQueued [ith_job]s
+ 4th-Data-IN-DESerialised [ith_job]s
+ ( ...process operated the usefull [ith_job]s -<The PAYLOAD>-planned... )
+ 5th-Data-OUT-SERialised [ith_job]s
+ 6th-Data-OUT-QUEued [ith_job]s
+ 7th-Data-OUT-DEQueued in-MAIN
+ 8th-Data-OUT-DESerialised-in-MAIN's-"__main__"
-------------------------------MAIN-can-continue-in-pure-[SERIAL]-processing-----
加起来总是花费 non-negligible overhead-time(有关方程式和详细信息,请 参考: overhead-strict re-formulation of net-speedups achievable upon these add-on overhead costs,最好在进行重构之前,您的机器付出的代价将远远超过尝试忽略这些本金和可基准管理费用所获得的代价)
为了对这些间接成本进行基准测试,每个单独的,以微秒为单位,工具可用(但并非所有 Whosebug 成员都乐于对这些进行定量稳健的基准测试),只需在 [=] 上检查其他 posts 31=] 在 Whosebug 上。
joblib.Parallel
实施的第二个主要限制是 a resources-real-availability-agnostic 乐观 ,而 resources-state-aware 调度是在每个 real-world 系统上发生的事情。
人们可能期望任何 high-degree 并行代码执行,但除非对 end-to-end ( top-to-bottom ) 系统覆盖率采取复杂措施,否则所有处理都会进入 "just"-[CONCURRENT]
时间表(即如果资源允许)。这方面是扩展此 post 的足迹的方式,并且只是天真地放入上面的方案中,表明如果 CPU-cores (主要是任何其他 resource-class )不可用,则并发
will never reach the levels of speedup, that a resources-availability agnostic original Amdahl's Law was promising.
----------------------------MAIN-starts-escape-from-processing---in-pure-[SERIAL]
0: MAIN forks self -in-pure-[SERIAL]
[1] -in-pure-[SERIAL]
[2] -in-pure-[SERIAL]
... -in-pure-[SERIAL]
[n_jobs] as many copies of self-in-pure-[SERIAL]
as requested -in-pure-[SERIAL]
--------------------------MAIN-can-continue-in-"just"-[CONCURRENT]after[SERIAL]
+ 1st-Data-IN-SERialised-in-MAIN's-"__main__" , job(2), .., job(n_jobs):[SERIAL]
+ 2nd-Data-IN-QEUueed in MAIN for all job(1), job(2), .., job(n_jobs):[SERIAL]
+ 3rd-Data-IN-DEQueued [ith_job]s: "just"-[CONCURRENT]||X||X||
+ 4th-Data-IN-DESerialised [ith_job]s: "just"-[CONCURRENT]|X||X|||
+ ( ...process operated the usefull [ith_job]s-<The PAYLOAD>-planned... )||X|||X|
+ 5th-Data-OUT-SERialised [ith_job]s: "just"-[CONCURRENT]||||X|||
+ 6th-Data-OUT-QUEued [ith_job]s: "just"-[CONCURRENT]|X|X|X||
+ 7th-Data-OUT-DEQueued in-MAIN <--l job(1), job(2), .., job(n_jobs):[SERIAL]
+ 8th-Data-OUT-DESerialised-in-MAIN's-"__main__" job(2), .., job(n_jobs):[SERIAL]
-------------------------------MAIN-can-continue-processing------in-pure-[SERIAL]
... -in-pure-[SERIAL]
也许这真的很简单,但我在理解这一点时遇到了一些问题。
我面临的挑战是从 mother 函数内部执行子并行函数。在等待子并行函数调用的结果时,mother 函数应该 运行 只有一次。
我写了一个小例子来说明我的困境。
import string
from joblib import Parallel, delayed
import multiprocessing
def jobToDoById(id):
#do some other logic based on the ID given
rand_str = ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for i in range(10))
return [id, rand_str]
def childFunctionParallel(jobs):
num_cores = multiprocessing.cpu_count()
num_cores = num_cores - 1
if __name__ == '__main__':
p = Parallel(n_jobs=num_cores)(delayed(jobToDoById)(i) for i in jobs)
return p
def childFunctionSerial(jobs):
result = []
for job in jobs:
job_result = jobToDoById(job)
result.append(job_result)
return result
def motherFunction(countries_cities, doInParallel):
result = []
print("Start mainLogic")
for country in countries_cities:
city_list = countries_cities[country]
if(doInParallel):
cities_result = childFunctionParallel(city_list)
else:
cities_result = childFunctionSerial(city_list)
result.append(cities_result)
# ..... do some more logic
# ..... do some more logic before returning
print("End mainLogic")
return result
print("Start Program")
countries_cities = {
"United States" : ["Alabama", "Hawaii", "Mississippi", "Pennsylvania"],
"United Kingdom" : ["Cambridge", "Coventry", "Gloucester", "Nottingham"],
"France" : ["Marseille", "Paris", "Saint-Denis", "Nanterre", "Aubervilliers"],
"Denmark" : ["Aarhus", "Slagelse", "Nykøbing F", "Rønne", "Odense"],
"Australia" : ["Sydney", "Townsville", "Bendigo", "Bathurst", "Busselton"],
}
result_mother = motherFunction(countries_cities, doInParallel=True) # should be executed only once
print(result_mother)
print("End Program")
如果您在 True
和 False
之间切换 doInParallel
,那么您可以看到问题所在。当运行 childFunctionSerial()
motherFunction()
运行时只有一次。但是当我们 运行 和 childFunctionParallel
时, motherFunction()
会被执行多次。两者都给出相同的结果,但我遇到的问题是 motherFunction()
应该只执行一次。
两个问题:
1.如何重构程序使我们执行一次母函数
并从其中开始并行作业,而不 运行 宁同一个母函数的多个实例?
2. 除了 id
之外,如何将第二个参数传递给 jobToDoById()
?
广告 2:将附加参数放入元组并传递 ( id, .., )
这个比较简单,也比较常用,很多例子都能遇到。
def jobToDoById( aTupleOfPARAMs = ( -1, ) ): # jobToDoById(id):
# # do some other logic based on the ID given
if not type( aTupleOfPARAMs ) is tuple: # FUSE PROTECTION
return [-1, "call interface violated"]
if aTupleOfPARAMs[0] == -1: # FUSE PROTECTION
return [-1, None]
# .......................................# GO GET PROCESSED:
rand_str = ''.join( random.choice( string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits
)
for i in range( 10 )
)
return [id, rand_str]
第一个问题有点难,但更有趣的是 system-design 的 principal differences among [SERIAL]
, "just"-[CONCURRENT]
and true-[PARALLEL]
system-scheduling policies of more than one processes 在大众媒体中并不总是受到尊重(有时甚至在学术界也不被尊重)。
广告 1:您可能会感到惊讶,这在当前版本中绝不会发生
您的代码明确提到了 joblib.Parallel
和 multiprocessing
模块,但文档说:
By default
Parallel
uses the Pythonmultiprocessing
module to fork separate Python worker processes to execute tasks concurrently on separate CPUs. This is a reasonable default for generic Python programs but it induces some overhead as the input and output data need to be serialized in a queue for communication with the worker processes.
有两个含义 - 您的处理将 支付双重费用,[TIME]
-domain and [SPACE]
-domain overhead costs, that may easily become unacceptably huge OVERHEAD COSTS ( and if one has already noticed also the words "data" and "serialized" in the citation above, the better ) - for details see re-formulated Amdahl's Law, as detailed in Section: Criticism et al parallelism-amdahl:
1) 整个 Python 解释器 包括它的数据和内部状态是完全分叉的(所以你得到尽可能多的副本,每个 运行只有一个 process-flow,这是为了不在 GIL-round-robin 碎片上失去性能 / 只有 1-runs-All-Others-have-to-wait 类型的 GIL-blocking / 步进存在任何 1+ processing-flow 如果在 threading-based 池等中制作。)
2) 除了所有必须如上所述发生的完整 Python 解释器 + 状态 re-instantiations 之外,还有 ALL <data-IN>
+ <data-OUT>
是:
----------------------------MAIN-starts-to-escape-from-pure-[SERIAL]-processing--
0: MAIN forks self
[1]
[2]
...
[n_jobs] - as many copies of self as requested
-------------------------MAIN-can-continue-in-"just"-[CONCURRENT]-after:
1st-Data-IN-SERialised-in-MAIN's-"__main__"
+ 2nd-Data-IN-QUEueed in MAIN
+ 3rd-Data-IN-DEQueued [ith_job]s
+ 4th-Data-IN-DESerialised [ith_job]s
+ ( ...process operated the usefull [ith_job]s -<The PAYLOAD>-planned... )
+ 5th-Data-OUT-SERialised [ith_job]s
+ 6th-Data-OUT-QUEued [ith_job]s
+ 7th-Data-OUT-DEQueued in-MAIN
+ 8th-Data-OUT-DESerialised-in-MAIN's-"__main__"
-------------------------------MAIN-can-continue-in-pure-[SERIAL]-processing-----
加起来总是花费 non-negligible overhead-time(有关方程式和详细信息,请 参考: overhead-strict re-formulation of net-speedups achievable upon these add-on overhead costs,最好在进行重构之前,您的机器付出的代价将远远超过尝试忽略这些本金和可基准管理费用所获得的代价)
为了对这些间接成本进行基准测试,每个单独的,以微秒为单位,工具可用(但并非所有 Whosebug 成员都乐于对这些进行定量稳健的基准测试),只需在 [=] 上检查其他 posts 31=] 在 Whosebug 上。
joblib.Parallel
实施的第二个主要限制是 a resources-real-availability-agnostic 乐观 ,而 resources-state-aware 调度是在每个 real-world 系统上发生的事情。
人们可能期望任何 high-degree 并行代码执行,但除非对 end-to-end ( top-to-bottom ) 系统覆盖率采取复杂措施,否则所有处理都会进入 "just"-[CONCURRENT]
时间表(即如果资源允许)。这方面是扩展此 post 的足迹的方式,并且只是天真地放入上面的方案中,表明如果 CPU-cores (主要是任何其他 resource-class )不可用,则并发
will never reach the levels of speedup, that a resources-availability agnostic original Amdahl's Law was promising.
----------------------------MAIN-starts-escape-from-processing---in-pure-[SERIAL]
0: MAIN forks self -in-pure-[SERIAL]
[1] -in-pure-[SERIAL]
[2] -in-pure-[SERIAL]
... -in-pure-[SERIAL]
[n_jobs] as many copies of self-in-pure-[SERIAL]
as requested -in-pure-[SERIAL]
--------------------------MAIN-can-continue-in-"just"-[CONCURRENT]after[SERIAL]
+ 1st-Data-IN-SERialised-in-MAIN's-"__main__" , job(2), .., job(n_jobs):[SERIAL]
+ 2nd-Data-IN-QEUueed in MAIN for all job(1), job(2), .., job(n_jobs):[SERIAL]
+ 3rd-Data-IN-DEQueued [ith_job]s: "just"-[CONCURRENT]||X||X||
+ 4th-Data-IN-DESerialised [ith_job]s: "just"-[CONCURRENT]|X||X|||
+ ( ...process operated the usefull [ith_job]s-<The PAYLOAD>-planned... )||X|||X|
+ 5th-Data-OUT-SERialised [ith_job]s: "just"-[CONCURRENT]||||X|||
+ 6th-Data-OUT-QUEued [ith_job]s: "just"-[CONCURRENT]|X|X|X||
+ 7th-Data-OUT-DEQueued in-MAIN <--l job(1), job(2), .., job(n_jobs):[SERIAL]
+ 8th-Data-OUT-DESerialised-in-MAIN's-"__main__" job(2), .., job(n_jobs):[SERIAL]
-------------------------------MAIN-can-continue-processing------in-pure-[SERIAL]
... -in-pure-[SERIAL]