没看懂Pythonconcurrent.futures
I don't understand Python concurrent.futures
我正在尝试使用多线程执行 do_stuff_parallel
函数。
def do_stuff_parallel(par1, par2, par3, par4, par5):
print("test1")
print(str(par1))
print("test2")
if(.. == ".."):
...
with ThreadPoolExecutor(max_workers=4) as executor:
futures = set()
for LinkedConnector in FuncGroupTask.Connectors:
f = executor.submit(do_stuff_parallel, par1, par2, par3, par4, par5)
futures.add(f)
每次都需要执行do_stuff_parallel
函数,一次有5个参数
现在它进入 do_stuff_parallel
方法,但它只打印 "test1" 而从不打印 par1 或 "test2.
你做的有点不对。 Executor.map()
将您列表中的每一项都映射到工作人员,您将获得异常。您的函数需要五个参数,但您只发送一个。使用 concurrent.futures 时,异常存储在未来,并且只有在您尝试检索结果时才会引发。这将向您显示例外情况:
def do_stuff_parallel(par1, par2, par3, par4, par5):
print("function entered")
par1=par2=par3=par4=par5 = 42
with ThreadPoolExecutor(max_workers=4) as executor:
for _ in range(1,10):
f = executor.map(do_stuff_parallel, [par1, par2, par3, par4, par5])
for q in f:
print(q)
结果:
TypeError: do_stuff_parallel() missing 4 required positional arguments: 'par2', 'par3', 'par4', and 'par5'
您需要传递一个元组,然后在您的函数中解压它:
def do_stuff_parallel(args):
print(args)
return(42)
par1=par2=par3=par4=par5 = 43
with ThreadPoolExecutor(max_workers=4) as executor:
for _ in range(1,10):
f = executor.map(do_stuff_parallel, [(5,6,7,8,9),(3,4,5,6,7)])
for q in f:
print(q)
现在您可以通过 args[0]、args[1] 等访问您的参数。如果您不能更改工作函数接口,您可以创建一个包装器,您的代理工作人员在其中调用真正的工作人员:
def proxy_worker(args):
return real_worker(args[0], args[1] ....)
当然,如果您不打算映射任何内容,请使用 executor.submit()
而不是 map()
。如果您的目的只是多次启动 worker,这将允许您发送多个参数。 Map 用作将迭代器映射到 worker 的助手,您可能根本不需要它。
使用 executor.submit()
,您可以保持原来的界面:
def do_stuff_parallel(a,b,c,d,e):
print(a)
return(42)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = set()
for _ in range(1,10):
f = executor.submit(do_stuff_parallel, par1, par2, par3, par4, par5)
futures.add(f)
我正在尝试使用多线程执行 do_stuff_parallel
函数。
def do_stuff_parallel(par1, par2, par3, par4, par5):
print("test1")
print(str(par1))
print("test2")
if(.. == ".."):
...
with ThreadPoolExecutor(max_workers=4) as executor:
futures = set()
for LinkedConnector in FuncGroupTask.Connectors:
f = executor.submit(do_stuff_parallel, par1, par2, par3, par4, par5)
futures.add(f)
每次都需要执行do_stuff_parallel
函数,一次有5个参数
现在它进入 do_stuff_parallel
方法,但它只打印 "test1" 而从不打印 par1 或 "test2.
你做的有点不对。 Executor.map()
将您列表中的每一项都映射到工作人员,您将获得异常。您的函数需要五个参数,但您只发送一个。使用 concurrent.futures 时,异常存储在未来,并且只有在您尝试检索结果时才会引发。这将向您显示例外情况:
def do_stuff_parallel(par1, par2, par3, par4, par5):
print("function entered")
par1=par2=par3=par4=par5 = 42
with ThreadPoolExecutor(max_workers=4) as executor:
for _ in range(1,10):
f = executor.map(do_stuff_parallel, [par1, par2, par3, par4, par5])
for q in f:
print(q)
结果:
TypeError: do_stuff_parallel() missing 4 required positional arguments: 'par2', 'par3', 'par4', and 'par5'
您需要传递一个元组,然后在您的函数中解压它:
def do_stuff_parallel(args):
print(args)
return(42)
par1=par2=par3=par4=par5 = 43
with ThreadPoolExecutor(max_workers=4) as executor:
for _ in range(1,10):
f = executor.map(do_stuff_parallel, [(5,6,7,8,9),(3,4,5,6,7)])
for q in f:
print(q)
现在您可以通过 args[0]、args[1] 等访问您的参数。如果您不能更改工作函数接口,您可以创建一个包装器,您的代理工作人员在其中调用真正的工作人员:
def proxy_worker(args):
return real_worker(args[0], args[1] ....)
当然,如果您不打算映射任何内容,请使用 executor.submit()
而不是 map()
。如果您的目的只是多次启动 worker,这将允许您发送多个参数。 Map 用作将迭代器映射到 worker 的助手,您可能根本不需要它。
使用 executor.submit()
,您可以保持原来的界面:
def do_stuff_parallel(a,b,c,d,e):
print(a)
return(42)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = set()
for _ in range(1,10):
f = executor.submit(do_stuff_parallel, par1, par2, par3, par4, par5)
futures.add(f)