Python 中的多处理同步问题,双 for 循环
Sync issues with multiprocessing in Python, double for loop
受教程multiprocessing factorial的启发,我尝试对一个微不足道的力计算模块进行多处理。
我主要担心的是 queue.get 功能没有按预期的顺序检索。例如,它不是给出 [5, 4, 3, 2, 1, 0, -1, -2, -3, -4, -5],而是根据不同的处理器给出混乱的输出。 1)如何根据进程调用顺序追加队列中的结果?我应该使用池、地图、锁或任何类似的东西吗? 2)如何避免内存 sync/overwriting 问题?
def mp_worker(istart, iend, x, out_q1, out_q2):
global_N = len(x)
outdict1 = []
outdict2 = []
k = 0
for i in range(istart,iend,1):
temp_FX = 0
temp_FY = 0
for j in range(global_N):
if i != j:
temp_FX = temp_FX + (x[j]-x[i])
temp_FY = temp_FY + (x[j]-x[i])
outdict1.append(temp_FX)
outdict2.append(temp_FY)
k = k + 1
out_q1.put(outdict1)
out_q2.put(outdict2)
def mp_factorizer( nprocs):
x = mem.x
FORCE = mem.FORCE
N = len(FORCE)
out_q1 = multiprocessing.Queue()
out_q2 = multiprocessing.Queue()
chunksize = int(math.ceil(N / float(nprocs)))
procs = []
for i in range(nprocs):
istart = chunksize * i
iend = chunksize * (i + 1)
p = multiprocessing.Process(
target=mp_worker,
args=(istart, iend, x, out_q1, out_q2))
procs.append(p)
p.start()
# Collect all results into a single result dict. We know how many dicts
# with results to expect.
resultdict1 = []
resultdict2 = []
for i in range(nprocs):
resultdict1 = resultdict1 + out_q1.get()
resultdict2 = resultdict2 + out_q2.get()
# Wait for all worker processes to finish
for p in procs:
p.join()
return resultdict1
项目以工作进程恰好完成的顺序添加到队列中。如果你想强制下单,你必须...强制下单;-)
Process
不适合这个。它们的执行本质上是无序的,顺序很可能会从一个 运行 下一个改变。
在这种情况下,可能最简单:首先,完全抛开队列。像这样结束你的 mp_worker()
:
return outdict1, outdict2
然后使用Pool
。有几种方法可以使用一种。最像你已经在做的事情看起来像:
pool = multiprocessing.Pool(nprocs)
for i in range(nprocs):
istart = chunksize * i
iend = chunksize * (i + 1)
p = pool.apply_async(mp_worker, (istart, iend, x))
procs.append(p)
resultdict1 = []
resultdict2 = []
for p in procs:
t1, t2 = p.get()
resultdict1.extend(t1)
resultdict2.extend(t2)
pool.close()
pool.join()
现在获取结果的顺序与传递任务的顺序相同;订单已被强制执行。
注意:从 +
切换到 .extend()
在逻辑上不是必需的,但可以将二次时间(在循环迭代次数中)操作减少为分摊线性时间操作。这与多处理无关。 somelist = somelist + anotherlist
总是更好地编码为 somelist.extend(anotherlist)
。
关于OS
这里有一个关于为什么 "it worked" 在 Windows 而不是 Linux 的猜测:从历史上看,进程创建在 Linux 比 Windows 更便宜( Windows 投入更多精力来加速其线程)。这使得当进程执行大约相同数量的工作时,进程更有可能以它们开始的相同顺序在 Windows 结束。但他们肯定 可以 在 Windows 上完成 "out of order"
也是。
无论如何,Python 对此没有任何发言权:如果您需要特定订单,则必须强制执行该订单。
受教程multiprocessing factorial的启发,我尝试对一个微不足道的力计算模块进行多处理。 我主要担心的是 queue.get 功能没有按预期的顺序检索。例如,它不是给出 [5, 4, 3, 2, 1, 0, -1, -2, -3, -4, -5],而是根据不同的处理器给出混乱的输出。 1)如何根据进程调用顺序追加队列中的结果?我应该使用池、地图、锁或任何类似的东西吗? 2)如何避免内存 sync/overwriting 问题?
def mp_worker(istart, iend, x, out_q1, out_q2):
global_N = len(x)
outdict1 = []
outdict2 = []
k = 0
for i in range(istart,iend,1):
temp_FX = 0
temp_FY = 0
for j in range(global_N):
if i != j:
temp_FX = temp_FX + (x[j]-x[i])
temp_FY = temp_FY + (x[j]-x[i])
outdict1.append(temp_FX)
outdict2.append(temp_FY)
k = k + 1
out_q1.put(outdict1)
out_q2.put(outdict2)
def mp_factorizer( nprocs):
x = mem.x
FORCE = mem.FORCE
N = len(FORCE)
out_q1 = multiprocessing.Queue()
out_q2 = multiprocessing.Queue()
chunksize = int(math.ceil(N / float(nprocs)))
procs = []
for i in range(nprocs):
istart = chunksize * i
iend = chunksize * (i + 1)
p = multiprocessing.Process(
target=mp_worker,
args=(istart, iend, x, out_q1, out_q2))
procs.append(p)
p.start()
# Collect all results into a single result dict. We know how many dicts
# with results to expect.
resultdict1 = []
resultdict2 = []
for i in range(nprocs):
resultdict1 = resultdict1 + out_q1.get()
resultdict2 = resultdict2 + out_q2.get()
# Wait for all worker processes to finish
for p in procs:
p.join()
return resultdict1
项目以工作进程恰好完成的顺序添加到队列中。如果你想强制下单,你必须...强制下单;-)
Process
不适合这个。它们的执行本质上是无序的,顺序很可能会从一个 运行 下一个改变。
在这种情况下,可能最简单:首先,完全抛开队列。像这样结束你的 mp_worker()
:
return outdict1, outdict2
然后使用Pool
。有几种方法可以使用一种。最像你已经在做的事情看起来像:
pool = multiprocessing.Pool(nprocs)
for i in range(nprocs):
istart = chunksize * i
iend = chunksize * (i + 1)
p = pool.apply_async(mp_worker, (istart, iend, x))
procs.append(p)
resultdict1 = []
resultdict2 = []
for p in procs:
t1, t2 = p.get()
resultdict1.extend(t1)
resultdict2.extend(t2)
pool.close()
pool.join()
现在获取结果的顺序与传递任务的顺序相同;订单已被强制执行。
注意:从 +
切换到 .extend()
在逻辑上不是必需的,但可以将二次时间(在循环迭代次数中)操作减少为分摊线性时间操作。这与多处理无关。 somelist = somelist + anotherlist
总是更好地编码为 somelist.extend(anotherlist)
。
关于OS
这里有一个关于为什么 "it worked" 在 Windows 而不是 Linux 的猜测:从历史上看,进程创建在 Linux 比 Windows 更便宜( Windows 投入更多精力来加速其线程)。这使得当进程执行大约相同数量的工作时,进程更有可能以它们开始的相同顺序在 Windows 结束。但他们肯定 可以 在 Windows 上完成 "out of order" 也是。
无论如何,Python 对此没有任何发言权:如果您需要特定订单,则必须强制执行该订单。