Python concurrent.futures 使用子进程,运行 多个 python 脚本
Python concurrent.futures using subprocess, running several python script
我想 运行 多个 python 脚本同时使用 concurrent.futures。
我的代码的串行版本在文件夹中查找特定的 python 文件并执行它。
import re
import os
import glob
import re
from glob import glob
import concurrent.futures as cf
FileList = [];
import time
FileList = [];
start_dir = os.getcwd();
pattern = "Read.py"
for dir,_,_ in os.walk(start_dir):
FileList.extend(glob(os.path.join(dir,pattern))) ;
FileList
i=0
for file in FileList:
dir=os.path.dirname((file))
dirname1 = os.path.basename(dir)
print(dirname1)
i=i+1
Str='python '+ file
print(Str)
completed_process = subprocess.run(Str)`
对于我的代码的并行版本:
def Python_callback(future):
print(future.run_type, future.jid)
return "One Folder finished executing"
def Python_execute():
from concurrent.futures import ProcessPoolExecutor as Pool
args = FileList
pool = Pool(max_workers=1)
future = pool.submit(subprocess.call, args, shell=1)
future.run_type = "run_type"
future.jid = FileList
future.add_done_callback(Python_callback)
print("Python executed")
if __name__ == '__main__':
import subprocess
Python_execute()
问题是我不确定如何将 FileList 的每个元素传递给分隔 cpu
提前感谢您的帮助
最小的变化是对每个元素使用一次 submit
,而不是对整个列表使用一次:
futures = []
for file in FileList:
future = pool.submit(subprocess.call, file, shell=1)
future.blah blah
futures.append(future)
futures
列表仅在您想对期货做某事时才是必需的——等待它们完成,检查它们的 return 值,等等。
与此同时,您正在使用 max_workers=1
显式创建池。毫不奇怪,这意味着您只会获得 1 个工作子进程,因此它最终会等待一个子进程完成,然后再获取下一个子进程。如果你想同时 运行 它们,删除那个 max_workers
并让它默认为每个核心一个(或者传递 max_workers=8
或其他一些不是 1
的数字,如果你有充分的理由覆盖默认值)。
虽然我们正在做这件事,但有很多方法可以简化您正在做的事情:
- 这里真的需要
multiprocessing
吗?如果您需要与每个子进程进行通信,那么在单个线程中进行通信可能会很痛苦 — 但线程,或者 asyncio
可以像这里的进程一样工作。
- 更重要的是,看起来您实际上不需要任何东西,只需启动进程并等待它完成,这可以用简单的同步代码完成。
- 为什么要构建字符串并使用
shell=1
而不是仅传递列表而不使用 shell?使用 shell 会不必要地产生开销、安全问题和调试烦恼。
- 您真的不需要每个 future 上的
jid
— 它只是您所有调用字符串的列表,这没有用。可能更有用的是某种标识符,或者子进程 return 代码,或者……可能还有很多其他东西,但它们都是可以通过读取 return 值 subprocess.call
或一个简单的包装器。
- 你也真的不需要回调。如果您只是将所有期货收集在一个列表中并
as_completed
它,您可以更简单地打印结果。
- 如果您同时执行以上两项操作,那么您在循环中只剩下一个
pool.submit
——这意味着您可以将整个循环替换为 pool.map
。
- 您很少需要或不想混合使用
os.walk
和 glob
。当您实际拥有 glob 模式时,将 fnmatch
应用于 os.walk
中的 files
列表。但是在这里,您只是在每个目录中查找特定的文件名,所以实际上,您需要过滤的只是 file == 'Read.py'
.
- 您没有在循环中使用
i
。但是如果你确实需要它,最好做 for i, file in enumerate(FileList):
而不是 for file in FileList:
并手动增加 i
.
我想 运行 多个 python 脚本同时使用 concurrent.futures。 我的代码的串行版本在文件夹中查找特定的 python 文件并执行它。
import re
import os
import glob
import re
from glob import glob
import concurrent.futures as cf
FileList = [];
import time
FileList = [];
start_dir = os.getcwd();
pattern = "Read.py"
for dir,_,_ in os.walk(start_dir):
FileList.extend(glob(os.path.join(dir,pattern))) ;
FileList
i=0
for file in FileList:
dir=os.path.dirname((file))
dirname1 = os.path.basename(dir)
print(dirname1)
i=i+1
Str='python '+ file
print(Str)
completed_process = subprocess.run(Str)`
对于我的代码的并行版本:
def Python_callback(future):
print(future.run_type, future.jid)
return "One Folder finished executing"
def Python_execute():
from concurrent.futures import ProcessPoolExecutor as Pool
args = FileList
pool = Pool(max_workers=1)
future = pool.submit(subprocess.call, args, shell=1)
future.run_type = "run_type"
future.jid = FileList
future.add_done_callback(Python_callback)
print("Python executed")
if __name__ == '__main__':
import subprocess
Python_execute()
问题是我不确定如何将 FileList 的每个元素传递给分隔 cpu
提前感谢您的帮助
最小的变化是对每个元素使用一次 submit
,而不是对整个列表使用一次:
futures = []
for file in FileList:
future = pool.submit(subprocess.call, file, shell=1)
future.blah blah
futures.append(future)
futures
列表仅在您想对期货做某事时才是必需的——等待它们完成,检查它们的 return 值,等等。
与此同时,您正在使用 max_workers=1
显式创建池。毫不奇怪,这意味着您只会获得 1 个工作子进程,因此它最终会等待一个子进程完成,然后再获取下一个子进程。如果你想同时 运行 它们,删除那个 max_workers
并让它默认为每个核心一个(或者传递 max_workers=8
或其他一些不是 1
的数字,如果你有充分的理由覆盖默认值)。
虽然我们正在做这件事,但有很多方法可以简化您正在做的事情:
- 这里真的需要
multiprocessing
吗?如果您需要与每个子进程进行通信,那么在单个线程中进行通信可能会很痛苦 — 但线程,或者asyncio
可以像这里的进程一样工作。 - 更重要的是,看起来您实际上不需要任何东西,只需启动进程并等待它完成,这可以用简单的同步代码完成。
- 为什么要构建字符串并使用
shell=1
而不是仅传递列表而不使用 shell?使用 shell 会不必要地产生开销、安全问题和调试烦恼。 - 您真的不需要每个 future 上的
jid
— 它只是您所有调用字符串的列表,这没有用。可能更有用的是某种标识符,或者子进程 return 代码,或者……可能还有很多其他东西,但它们都是可以通过读取 return 值subprocess.call
或一个简单的包装器。 - 你也真的不需要回调。如果您只是将所有期货收集在一个列表中并
as_completed
它,您可以更简单地打印结果。 - 如果您同时执行以上两项操作,那么您在循环中只剩下一个
pool.submit
——这意味着您可以将整个循环替换为pool.map
。 - 您很少需要或不想混合使用
os.walk
和glob
。当您实际拥有 glob 模式时,将fnmatch
应用于os.walk
中的files
列表。但是在这里,您只是在每个目录中查找特定的文件名,所以实际上,您需要过滤的只是file == 'Read.py'
. - 您没有在循环中使用
i
。但是如果你确实需要它,最好做for i, file in enumerate(FileList):
而不是for file in FileList:
并手动增加i
.