Python 多个进程的队列继承问题

Python Queue inheritance issues with multiple processes

from multiprocessing import Process, Manager, Queue
import schedule
import time

def checkBirthdays(accountQueue):
    print('[CheckBirthdays] Initated')
    acc = {
        'email': 'demo@test.com'
    }
    accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
    accountProcessing.append(acc)
    accountQueue.put(accountProcessing) # ****

def createSchedule(accountQueue):
    # similar to cron, executes at a certain time
    schedule.every().day.at("23:51").do(checkBirthdays, accountQueue)
    while True:
        schedule.run_pending()
        # check every 60 seconds
        time.sleep(60)

def main():
    # FileNotFoundError: [Errno 2] No such file or directory
    manager = Manager()
    accountQueue = manager.Queue()

    # RuntimeError: Queue objects should only be shared between processes through inheritance
    # accountQueue = Queue()
    schedule = Process(target=createSchedule, args=(accountQueue,)).start()

if __name__ == '__main__':
    main()

Manager().Queue() 给出此错误 FileNotFoundError: [Errno 2] No such file or directory。当我收到 FileNotFound 错误时我不确定,它使用 Queue()

加载泡菜文件

Queue() 在 **** 标记处给出此错误 RuntimeError: Queue objects should only be shared between processes through inheritance

[CheckBirthdays] Initated
Process Process-2:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 827, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/tehpirate/Documents/Kelloggs/test.py", line 18, in createSchedule
    schedule.run_pending()
  File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 780, in run_pending
    default_scheduler.run_pending()
  File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 100, in run_pending
    self._run_job(job)
  File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 172, in _run_job
    ret = job.run()
  File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 661, in run
    ret = self.job_func()
  File "/home/tehpirate/Documents/Kelloggs/test.py", line 10, in checkBirthdays
    accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
  File "<string>", line 2, in empty
  File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 831, in _callmethod
    self._connect()
  File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 818, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 502, in Client
    c = SocketClient(address)
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 629, in SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory

我只添加了 Manager.join()(显然调整了秒数),如文档中所示,以保持主机 运行 和从机处于活动状态。如果我删除该行 manager.join(),它会立即显示您的错误消息。

from multiprocessing import Process, Manager, Queue
import schedule
import time

def checkBirthdays(accountQueue):
    print('[CheckBirthdays] Initated')
    acc = {
        'email': 'demo@test.com'
    }
    accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
    accountProcessing.append(acc)
    accountQueue.put(accountProcessing) # ****

def createSchedule(accountQueue):
    # similar to cron, executes at a certain time
    schedule.every(1).seconds.do(checkBirthdays, accountQueue)
    while True:
        schedule.run_pending()
        # check every 60 seconds
        time.sleep(1)

def main():
    # FileNotFoundError: [Errno 2] No such file or directory
    manager = Manager()
    accountQueue = manager.Queue()

    # RuntimeError: Queue objects should only be shared between processes through inheritance
    # accountQueue = Queue()
    schedule = Process(target=createSchedule, args=(accountQueue,)).start()
    manager.join()  # <--------------------------------

if __name__ == '__main__':
    main()

这是因为在这种情况下有一个内部机制通过 Socket 处理进程间通信。如果一个主人或它的一个奴隶被杀死,通信隧道也会关闭,因此它会说最后没有这样的“文件”,套接字在基于 Unix 的系统上仍然是一种(某种)文件。

manager.join() 真正做的是检查主控(您的 manager)是否可以丢弃从属进程,因为它们不再有用(最后执行的程序行、崩溃等)或者是否将此操作“重新安排”到未来并保持 运行.

类似这样的高级代码草图:

import time

while True:  # executed in the master process
    for slave in get_all_slaves():
        state = discard_if_posible(slave)
        # do stuff with the final state from the slave
    time.sleep(0.5)

为了更深入,Manager._processnothing more than a Process instance,这里是 join()

最后,考虑为 类 使用上下文管理器以防止 shutdown/cleanup 问题也发生。 with__enter__()__exit__() 是有原因的,尤其是在多处理代码库中。


编辑:代码运行良好。要么您没有提供重现您的问题的最小工作示例,要么您没有通过建议的答案修复它。

/tmp$ mkdir mpanswer && cd mpanswer
/tmp/mpanswer$ cat << EOF > main.py 
> from multiprocessing import Process, Manager, Queue
> import schedule
> import time
> 
> def checkBirthdays(accountQueue):
>     print('[CheckBirthdays] Initated')
>     acc = {
>         'email': 'demo@test.com'
>     }
>     accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
>     accountProcessing.append(acc)
>     accountQueue.put(accountProcessing) # ****
> 
> def createSchedule(accountQueue):
>     # similar to cron, executes at a certain time
>     schedule.every(1).seconds.do(checkBirthdays, accountQueue)
>     while True:
>         schedule.run_pending()
>         # check every 60 seconds
>         time.sleep(1)
> 
> def main():
>     # FileNotFoundError: [Errno 2] No such file or directory
>     manager = Manager()
>     accountQueue = manager.Queue()
> 
>     # RuntimeError: Queue objects should only be shared between processes through inheritance
>     # accountQueue = Queue()
>     schedule = Process(target=createSchedule, args=(accountQueue,)).start()
>     manager.join()  # <--------------------------------
> 
> if __name__ == '__main__':
>     main()
> EOF
/tmp/mpanswer$ cat << EOF > Dockerfile 
> FROM python:alpine
> COPY main.py main.py
> RUN pip install schedule
> CMD python main.py
> EOF
/tmp/mpanswer$ docker build --no-cache --tag mpanswer . && docker run --rm -it mpanswer
Sending build context to Docker daemon  4.096kB
Step 1/4 : FROM python:alpine
 ---> 03c59395ddea
Step 2/4 : COPY main.py main.py
 ---> 4ebcad402bf1
Step 3/4 : RUN pip install schedule
 ---> Running in 3b4dcc189d48
Collecting schedule
  Downloading schedule-1.1.0-py2.py3-none-any.whl (10 kB)
Installing collected packages: schedule
Successfully installed schedule-1.1.0
WARNING: You are using pip version 21.0; however, version 21.2.4 is available.
You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.
Removing intermediate container 3b4dcc189d48
 ---> 039c1a333a5d
Step 4/4 : CMD python main.py
 ---> Running in 9dacf9084f28
Removing intermediate container 9dacf9084f28
 ---> de1391085794
Successfully built de1391085794
Successfully tagged mpanswer:latest
[CheckBirthdays] Initated
[CheckBirthdays] Initated
[CheckBirthdays] Initated
[CheckBirthdays] Initated
^C
/tmp/mpanswer$