python 在多个 CPU 核心上传播 subprocess.call
python spreading subprocess.call on multiple CPU cores
我有一个 python 代码,它在 shell 中使用 运行 的子进程包:
subprocess.call(mycode.py, shell=inshell)
当我执行 top 命令时,我发现我只使用了 ~30% 或更少的 CPU。
我意识到有些命令可能正在使用磁盘而不是 cpu 因此我正在计时速度。
运行在 linux 系统上执行此操作的速度似乎比 mac 2 核心系统慢。
我如何将其与线程或多处理包并行化,以便我可以在上述 linux 系统上使用多个 CPU 内核?
好吧,您可以先创建一个线程,然后将要并行化的函数传递给它。在函数内部你有子进程。
import threading
import subprocess
def worker():
"""thread worker function"""
print 'Worker'
subprocess.call(mycode.py, shell=inshell)
return
threads = []
for i in range(5):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
要并行化在 mycode.py
中完成的工作,您需要组织代码以使其符合以下基本模式:
# Import the kind of pool you want to use (processes or threads).
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
# Collect work items as an iterable of single values (eg tuples,
# dicts, or objects). If you can't hold all items in memory,
# define a function that yields work items instead.
work_items = [
(1, 'A', True),
(2, 'X', False),
...
]
# Define a callable to do the work. It should take one work item.
def worker(tup):
# Do the work.
...
# Return any results.
...
# Create a ThreadPool (or a process Pool) of desired size.
# What size? Experiment. Slowly increase until it stops helping.
pool = ThreadPool(4)
# Do work and collect results.
# Or use pool.imap() or pool.imap_unordered().
work_results = pool.map(worker, work_items)
# Wrap up.
pool.close()
pool.join()
---------------------
# Or, in Python 3.3+ you can do it like this, skipping the wrap-up code.
with ThreadPool(4) as pool:
work_results = pool.map(worker, work_items)
FMc 的回答略有改动,
work_items = [(1, 'A', True), (2, 'X', False), (3, 'B', False)]
def worker(tup):
for i in range(5000000):
print(work_items)
return
pool = Pool(processes = 8)
start = time.time()
work_results = pool.map(worker, work_items)
end = time.time()
print(end-start)
pool.close()
pool.join()
上面的代码需要 53.60 秒。然而,下面的技巧需要 27.34 秒。
from multiprocessing import Pool
import functools
import time
work_items = [(1, 'A', True), (2, 'X', False), (3, 'B', False)]
def worker(tup):
for i in range(5000000):
print(work_items)
return
def parallel_attribute(worker):
def easy_parallelize(worker, work_items):
pool = Pool(processes = 8)
work_results = pool.map(worker, work_items)
pool.close()
pool.join()
from functools import partial
return partial(easy_parallelize, worker)
start = time.time()
worker.parallel = parallel_attribute(worker(work_items))
end = time.time()
print(end - start)
两条评论:
1)我没有看到使用多处理虚拟机有什么不同
2) 使用 Python 的部分函数(带嵌套的范围)就像一个很棒的包装器,可以将计算时间减少 1/2。参考:https://www.binpress.com/tutorial/simple-python-parallelism/121
还有,谢谢FMc!
我有一个 python 代码,它在 shell 中使用 运行 的子进程包:
subprocess.call(mycode.py, shell=inshell)
当我执行 top 命令时,我发现我只使用了 ~30% 或更少的 CPU。 我意识到有些命令可能正在使用磁盘而不是 cpu 因此我正在计时速度。 运行在 linux 系统上执行此操作的速度似乎比 mac 2 核心系统慢。
我如何将其与线程或多处理包并行化,以便我可以在上述 linux 系统上使用多个 CPU 内核?
好吧,您可以先创建一个线程,然后将要并行化的函数传递给它。在函数内部你有子进程。
import threading
import subprocess
def worker():
"""thread worker function"""
print 'Worker'
subprocess.call(mycode.py, shell=inshell)
return
threads = []
for i in range(5):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
要并行化在 mycode.py
中完成的工作,您需要组织代码以使其符合以下基本模式:
# Import the kind of pool you want to use (processes or threads).
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
# Collect work items as an iterable of single values (eg tuples,
# dicts, or objects). If you can't hold all items in memory,
# define a function that yields work items instead.
work_items = [
(1, 'A', True),
(2, 'X', False),
...
]
# Define a callable to do the work. It should take one work item.
def worker(tup):
# Do the work.
...
# Return any results.
...
# Create a ThreadPool (or a process Pool) of desired size.
# What size? Experiment. Slowly increase until it stops helping.
pool = ThreadPool(4)
# Do work and collect results.
# Or use pool.imap() or pool.imap_unordered().
work_results = pool.map(worker, work_items)
# Wrap up.
pool.close()
pool.join()
---------------------
# Or, in Python 3.3+ you can do it like this, skipping the wrap-up code.
with ThreadPool(4) as pool:
work_results = pool.map(worker, work_items)
FMc 的回答略有改动,
work_items = [(1, 'A', True), (2, 'X', False), (3, 'B', False)]
def worker(tup):
for i in range(5000000):
print(work_items)
return
pool = Pool(processes = 8)
start = time.time()
work_results = pool.map(worker, work_items)
end = time.time()
print(end-start)
pool.close()
pool.join()
上面的代码需要 53.60 秒。然而,下面的技巧需要 27.34 秒。
from multiprocessing import Pool
import functools
import time
work_items = [(1, 'A', True), (2, 'X', False), (3, 'B', False)]
def worker(tup):
for i in range(5000000):
print(work_items)
return
def parallel_attribute(worker):
def easy_parallelize(worker, work_items):
pool = Pool(processes = 8)
work_results = pool.map(worker, work_items)
pool.close()
pool.join()
from functools import partial
return partial(easy_parallelize, worker)
start = time.time()
worker.parallel = parallel_attribute(worker(work_items))
end = time.time()
print(end - start)
两条评论: 1)我没有看到使用多处理虚拟机有什么不同 2) 使用 Python 的部分函数(带嵌套的范围)就像一个很棒的包装器,可以将计算时间减少 1/2。参考:https://www.binpress.com/tutorial/simple-python-parallelism/121
还有,谢谢FMc!