python:multiprocessing.dummy 使用 imap_unordered 遍历 growing/modifying 列表
python: multiprocessing.dummy using imap_unordered to iterate thru a growing/modifying list
摘要
- 我想使用多线程来处理一个列表。
- 在处理列表时,在某些情况下,可能会添加列表,因此也应处理新添加的内容。 (不会是无限的)
- 此处理的输出可能很大,并且只会迭代一次。我看不出有什么理由将整个事情都留在记忆中。
- 输出can/should立即处理。顺序无关紧要。
代码
为了测试这个想法,我写了以下内容,使用 windows 中的 python 2.7.10:
from multiprocessing.dummy import Pool as thPool
from time import sleep
def funct(n):
#print "{"+str(n)+"}",
#print "|"+str(len(li))+"|",
if n % 2 == 0:
return n
elif n % 3 == 0:
li.append(12)
elif n % 5 == 0:
li.append(14)
#sleep(.25)
if __name__ == "__main__":
P = thPool(4)
li = [1,2,3,4,5,6,7,8,9]
lf=(i for i in P.imap_unordered(funct,li) if i is not None)
#print lf
for m in lf:
print "("+str(m)+")",
#sleep(.25)
我期望得到偶数 (2,4,6,8),两个 12 和一个 14,顺序不分先后。
结果
i 运行 以上多次。我每次都得到不同的结果:
- (2) (4) (6) (8) (12) (14) (12)
- (2) (4) (6) (8) (12) (14)
- (2) (4) (6) (8)
我假设迭代器在附加列表之前完成。我在 funct 中放置了一个调试打印语句来显示全局列表的大小 li 我得到了:
- |9||9| |9||9| |10||10| |10| (2)|11||11| (4) (6) (8)
- |9||9| |9| |9||9| |11|(2)|11| |11||11| |11| |12| (4) (6) (8) (12) (14)
- |9||9| |9||9| |9| |10| (2)|11| (4)|11| (6)|11| |12|(8) |12| |12|(12) (14) (12)
我也试过制造延迟,以防出现竞争条件,但这似乎对可预测性没有影响。
问题
- 为什么结果不可预测?为什么添加的列表 有时 没有被处理? (即使函数内列表的大小>9,意味着列表已被追加)
- 为什么 输出 总是有序的?
- 是否有使用 multiprocessing.dummy 池功能执行此操作的正确方法?
- 是否有推荐的替代方法?
编辑:我更清楚地重新阅读了问题,并了解到您想即时修改列表。这需要使用线程安全的适当共享对象(例如数组、队列甚至多处理库中的管理器)来完成。
此外,遗憾的是,我认为您不能在这种情况下使用 imap_unordered()
。我认为您看到的行为是由于 imap_unordered
有时会到达可迭代对象的末尾并停止分发工作 before 附加项目被放置列表。
在这种情况下不要使用 multiprocessing.Pool.imap_unordered
。有很多方法可以让它发挥作用,但它们既丑陋又脆弱。使用生产者-消费者模式,消费者偶尔充当生产者。
from multiprocessing.dummy import Process, Queue
def process(inq, outq):
while True:
n = inq.get()
try:
if n % 2 == 0:
outq.put(n) # Queue up for printing
elif n % 3 == 0:
inq.put(12) # Queue up for future processing
elif n % 5 == 0:
inq.put(14) # Queue up for future processing
finally:
inq.task_done()
def printer(q):
while True:
m = q.get()
try:
print "("+str(m)+")",
finally:
q.task_done()
def main():
workqueue = Queue()
printqueue = Queue()
printworker = Process(target=printer, args=(printqueue,))
printworker.daemon = True
printworker.start()
for i in range(4):
processor = Process(target=process, args=(workqueue, printqueue))
processor.daemon = True
processor.start()
li = [1,2,3,4,5,6,7,8,9]
for x in li:
workqueue.put(x) # optionally, put all items before starting processor threads so initial work is processed strictly before generated work
workqueue.join() # Wait for all work, including new work, to be processed
printqueue.join() # Then wait for all the results to be printed
if __name__ == '__main__':
main()
摘要
- 我想使用多线程来处理一个列表。
- 在处理列表时,在某些情况下,可能会添加列表,因此也应处理新添加的内容。 (不会是无限的)
- 此处理的输出可能很大,并且只会迭代一次。我看不出有什么理由将整个事情都留在记忆中。
- 输出can/should立即处理。顺序无关紧要。
代码
为了测试这个想法,我写了以下内容,使用 windows 中的 python 2.7.10:
from multiprocessing.dummy import Pool as thPool
from time import sleep
def funct(n):
#print "{"+str(n)+"}",
#print "|"+str(len(li))+"|",
if n % 2 == 0:
return n
elif n % 3 == 0:
li.append(12)
elif n % 5 == 0:
li.append(14)
#sleep(.25)
if __name__ == "__main__":
P = thPool(4)
li = [1,2,3,4,5,6,7,8,9]
lf=(i for i in P.imap_unordered(funct,li) if i is not None)
#print lf
for m in lf:
print "("+str(m)+")",
#sleep(.25)
我期望得到偶数 (2,4,6,8),两个 12 和一个 14,顺序不分先后。
结果
i 运行 以上多次。我每次都得到不同的结果:
- (2) (4) (6) (8) (12) (14) (12)
- (2) (4) (6) (8) (12) (14)
- (2) (4) (6) (8)
我假设迭代器在附加列表之前完成。我在 funct 中放置了一个调试打印语句来显示全局列表的大小 li 我得到了:
- |9||9| |9||9| |10||10| |10| (2)|11||11| (4) (6) (8)
- |9||9| |9| |9||9| |11|(2)|11| |11||11| |11| |12| (4) (6) (8) (12) (14)
- |9||9| |9||9| |9| |10| (2)|11| (4)|11| (6)|11| |12|(8) |12| |12|(12) (14) (12)
我也试过制造延迟,以防出现竞争条件,但这似乎对可预测性没有影响。
问题
- 为什么结果不可预测?为什么添加的列表 有时 没有被处理? (即使函数内列表的大小>9,意味着列表已被追加)
- 为什么 输出 总是有序的?
- 是否有使用 multiprocessing.dummy 池功能执行此操作的正确方法?
- 是否有推荐的替代方法?
编辑:我更清楚地重新阅读了问题,并了解到您想即时修改列表。这需要使用线程安全的适当共享对象(例如数组、队列甚至多处理库中的管理器)来完成。
此外,遗憾的是,我认为您不能在这种情况下使用 imap_unordered()
。我认为您看到的行为是由于 imap_unordered
有时会到达可迭代对象的末尾并停止分发工作 before 附加项目被放置列表。
在这种情况下不要使用 multiprocessing.Pool.imap_unordered
。有很多方法可以让它发挥作用,但它们既丑陋又脆弱。使用生产者-消费者模式,消费者偶尔充当生产者。
from multiprocessing.dummy import Process, Queue
def process(inq, outq):
while True:
n = inq.get()
try:
if n % 2 == 0:
outq.put(n) # Queue up for printing
elif n % 3 == 0:
inq.put(12) # Queue up for future processing
elif n % 5 == 0:
inq.put(14) # Queue up for future processing
finally:
inq.task_done()
def printer(q):
while True:
m = q.get()
try:
print "("+str(m)+")",
finally:
q.task_done()
def main():
workqueue = Queue()
printqueue = Queue()
printworker = Process(target=printer, args=(printqueue,))
printworker.daemon = True
printworker.start()
for i in range(4):
processor = Process(target=process, args=(workqueue, printqueue))
processor.daemon = True
processor.start()
li = [1,2,3,4,5,6,7,8,9]
for x in li:
workqueue.put(x) # optionally, put all items before starting processor threads so initial work is processed strictly before generated work
workqueue.join() # Wait for all work, including new work, to be processed
printqueue.join() # Then wait for all the results to be printed
if __name__ == '__main__':
main()