Python 多处理:在子进程中添加到队列

Python Multiprocessing: Adding to Queue Within Child Process

我想实现一个将数据存储到 Mongo 的文件爬虫。我想使用 multiprocessing 来 'hand off' 阻止任务,例如解压缩文件、文件抓取和上传到 Mongo。某些任务依赖于其他任务(即文件需要在抓取文件之前解压缩),所以我希望能够完成必要的任务并将新任务添加到同一任务队列中。

以下是我目前拥有的:

import multiprocessing


class Worker(multiprocessing.Process):
    def __init__(self, task_queue: multiprocessing.Queue):
        super(Worker, self).__init__()
        self.task_queue = task_queue

    def run(self):
        for (function, *args) in iter(self.task_queue.get, None):
            print(f'Running: {function.__name__}({*args,})')

            # Run the provided function with its parameters in child process
            function(*args)

            self.task_queue.task_done()


def foo(task_queue: multiprocessing.Queue) -> None:
    print('foo')
    # Add new task to queue from this child process
    task_queue.put((bar, 1))


def bar(x: int) -> None:
    print(f'bar: {x}')


def main():
    # Start workers on separate processes
    workers = []
    manager = multiprocessing.Manager()
    task_queue = manager.Queue()
    for i in range(multiprocessing.cpu_count()):
        worker = Worker(task_queue)
        workers.append(worker)
        worker.start()

    # Run foo on child process using the queue as parameter
    task_queue.put((foo, task_queue))

    for _ in workers:
        task_queue.put(None)

    # Block until workers complete and join main process
    for worker in workers:
        worker.join()

    print('Program completed.')


if __name__ == '__main__':
    main()

预期行为:

Running: foo((<AutoProxy[Queue] object, typeid 'Queue' at 0x1b963548908>,))
foo
Running: bar((1,))
bar: 1
Program completed.

实际行为:

Running: foo((<AutoProxy[Queue] object, typeid 'Queue' at 0x1b963548908>,))
foo
Program completed.

我对多处理还很陌生,所以非常感谢任何帮助。

正如@FrankYellin 指出的那样,这是因为 None 在添加 bar 之前被放入 task_queue

假设队列非空或等待任务完成 在程序期间(在我的情况下是这样),可以使用队列上的 join 方法。根据 docs:

Blocks until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

下面是更新后的代码:

import multiprocessing


class Worker(multiprocessing.Process):
    def __init__(self, task_queue: multiprocessing.Queue):
        super(Worker, self).__init__()
        self.task_queue = task_queue

    def run(self):
        for (function, *args) in iter(self.task_queue.get, None):
            print(f'Running: {function.__name__}({*args,})')

            # Run the provided function with its parameters in child process
            function(*args)

            self.task_queue.task_done() # <-- Notify queue that task is complete


def foo(task_queue: multiprocessing.Queue) -> None:
    print('foo')
    # Add new task to queue from this child process
    task_queue.put((bar, 1))


def bar(x: int) -> None:
    print(f'bar: {x}')


def main():
    # Start workers on separate processes
    workers = []
    manager = multiprocessing.Manager()
    task_queue = manager.Queue()
    for i in range(multiprocessing.cpu_count()):
        worker = Worker(task_queue)
        workers.append(worker)
        worker.start()

    # Run foo on child process using the queue as parameter
    task_queue.put((foo, task_queue))

    # Block until all items in queue are popped and completed
    task_queue.join() # <---

    for _ in workers:
        task_queue.put(None)

    # Block until workers complete and join main process
    for worker in workers:
        worker.join()

    print('Program completed.')


if __name__ == '__main__':
    main()

这似乎工作正常。如果我发现任何新的东西,我会更新这个。谢谢大家