python:multiprocessing.dummy 使用 imap_unordered 遍历 growing/modifying 列表

python: multiprocessing.dummy using imap_unordered to iterate thru a growing/modifying list

摘要

代码
为了测试这个想法,我写了以下内容,使用 windows 中的 python 2.7.10:

from multiprocessing.dummy import Pool as thPool
from time import sleep

def funct(n):
    #print "{"+str(n)+"}",
    #print "|"+str(len(li))+"|",
    if n % 2 == 0:
        return n
    elif n % 3 == 0:
        li.append(12)
    elif n % 5 == 0:
        li.append(14)
    #sleep(.25)

if __name__ == "__main__":
    P = thPool(4)
    li = [1,2,3,4,5,6,7,8,9]

    lf=(i for i in P.imap_unordered(funct,li) if i is not None)
    #print lf
    for m in lf:
        print "("+str(m)+")",
        #sleep(.25)

我期望得到偶数 (2,4,6,8),两个 12 和一个 14,顺序不分先后。

结果
i 运行 以上多次。我每次都得到不同的结果:

我假设迭代器在附加列表之前完成。我在 funct 中放置了一个调试打印语句来显示全局列表的大小 li 我得到了:

我也试过制造延迟,以防出现竞争条件,但这似乎对可预测性没有影响。

问题

  1. 为什么结果不可预测?为什么添加的列表 有时 没有被处理? (即使函数内列表的大小>9,意味着列表已被追加)
  2. 为什么 输出 总是有序的?
  3. 是否有使用 multiprocessing.dummy 池功能执行此操作的正确方法?
  4. 是否有推荐的替代方法?

编辑:我更清楚地重新阅读了问题,并了解到您想即时修改列表。这需要使用线程安全的适当共享对象(例如数组、队列甚至多处理库中的管理器)来完成。

此外,遗憾的是,我认为您不能在这种情况下使用 imap_unordered()。我认为您看到的行为是由于 imap_unordered 有时会到达可迭代对象的末尾并停止分发工作 before 附加项目被放置列表。

在这种情况下不要使用 multiprocessing.Pool.imap_unordered。有很多方法可以让它发挥作用,但它们既丑陋又脆弱。使用生产者-消费者模式,消费者偶尔充当生产者。

from multiprocessing.dummy import Process, Queue


def process(inq, outq):
    while True:
        n = inq.get()
        try:
            if n % 2 == 0:
                outq.put(n)  # Queue up for printing
            elif n % 3 == 0:
                inq.put(12)  # Queue up for future processing
            elif n % 5 == 0:
                inq.put(14)  # Queue up for future processing
        finally:
            inq.task_done()

def printer(q):
    while True:
        m = q.get()
        try:
            print "("+str(m)+")",
        finally:
            q.task_done()

def main():
    workqueue = Queue()
    printqueue = Queue()
    printworker = Process(target=printer, args=(printqueue,))
    printworker.daemon = True
    printworker.start()

    for i in range(4):
        processor = Process(target=process, args=(workqueue, printqueue))
        processor.daemon = True
        processor.start()

    li = [1,2,3,4,5,6,7,8,9]

    for x in li:
        workqueue.put(x)  # optionally, put all items before starting processor threads so initial work is processed strictly before generated work
    workqueue.join()  # Wait for all work, including new work, to be processed
    printqueue.join()  # Then wait for all the results to be printed

if __name__ == '__main__':
    main()