在worker方法中动态添加对象到队列

Dynamically add object to queue inside the worker method

我正在使用 python 多处理池和队列并行处理 运行 任务。 但是我必须在队列中动态插入其他作业并等待它们完成(它们也可以在队列中插入其他作业)。

def add_another(q,blocked_name):
    name = q.get()
    if 'a' in name and name not in blocked_name:
        print('The name contains an a')
        # Here I want to add another name in the queue
        # Like q.put('Another') even if 'Another' will create a loop

if __name__ == '__main__':

    m = Manager()
    q = m.Queue()
    p = Pool(3)

    worker = []

    req = ['alice','amanda','bob','mallery']

    for d in req:
        q.put(d)

    blocked_name = ['mallery','steve']

    for i in range(len(req)):
        worker.append(p.apply_async(add_another, (q,blocked_name,)))

    # Here I want to wait ALL the worker, even the one added inside the add_another method 
    [r.get() for r in worker]

我该怎么做?

感谢帮助

您似乎将队列和工作进程的内容混为一谈 — 因为简单地向前者添加名称并不一定会在池工作进程中将函数应用于它。

做你想做的事情的一种方法是使用可选的 callback 函数 apply_async() 支持并让它在队列更新时从队列中创建另一个工作进程。这确保为每个添加到队列的作业创建一个。

这就是我的建议。请注意,我更改了一些变量的名称以使代码更具可读性。

from multiprocessing import Manager, Pool


def add_another(queue, blocked_names):
    name = queue.get()
    print(f'processing {name}')
    if 'a' in name and name not in blocked_names:
        print('The name contains an "a"')
        queue.put('Another')  # Add another to queue.
        return True  # Indicate another was added.
    return False  # Indicate another one NOT added.

def check_result(another_added):
    ''' Start processing of another task if one was added to the queue. '''
    if another_added:
        results.append(pool.apply_async(add_another, (queue, blocked_names),
                                        callback=check_result))

if __name__ == '__main__':

    mgr = Manager()
    queue = mgr.Queue()
    pool = Pool(3)

    results = []
    reqs = ['alice', 'amanda', 'bob', 'mallery']
    blocked_names = ['mallery', 'steve']

    # Initialize task queue.
    for req in reqs:
        queue.put(req)

    for _ in range(len(reqs)):
        results.append(pool.apply_async(add_another, (queue, blocked_names),
                                        callback=check_result))

    res = [result.get() for result in results]
#    print(f'{res=}')
    print('-Fini-')