Python 多个进程的队列继承问题
Python Queue inheritance issues with multiple processes
from multiprocessing import Process, Manager, Queue
import schedule
import time
def checkBirthdays(accountQueue):
print('[CheckBirthdays] Initated')
acc = {
'email': 'demo@test.com'
}
accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
accountProcessing.append(acc)
accountQueue.put(accountProcessing) # ****
def createSchedule(accountQueue):
# similar to cron, executes at a certain time
schedule.every().day.at("23:51").do(checkBirthdays, accountQueue)
while True:
schedule.run_pending()
# check every 60 seconds
time.sleep(60)
def main():
# FileNotFoundError: [Errno 2] No such file or directory
manager = Manager()
accountQueue = manager.Queue()
# RuntimeError: Queue objects should only be shared between processes through inheritance
# accountQueue = Queue()
schedule = Process(target=createSchedule, args=(accountQueue,)).start()
if __name__ == '__main__':
main()
Manager().Queue() 给出此错误 FileNotFoundError: [Errno 2] No such file or directory
。当我收到 FileNotFound 错误时我不确定,它使用 Queue()
加载泡菜文件
Queue() 在 ****
标记处给出此错误 RuntimeError: Queue objects should only be shared between processes through inheritance
[CheckBirthdays] Initated
Process Process-2:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 827, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
self.run()
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/tehpirate/Documents/Kelloggs/test.py", line 18, in createSchedule
schedule.run_pending()
File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 780, in run_pending
default_scheduler.run_pending()
File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 100, in run_pending
self._run_job(job)
File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 172, in _run_job
ret = job.run()
File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 661, in run
ret = self.job_func()
File "/home/tehpirate/Documents/Kelloggs/test.py", line 10, in checkBirthdays
accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
File "<string>", line 2, in empty
File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 831, in _callmethod
self._connect()
File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 818, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 502, in Client
c = SocketClient(address)
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 629, in SocketClient
s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory
我只添加了 Manager.join()
(显然调整了秒数),如文档中所示,以保持主机 运行 和从机处于活动状态。如果我删除该行 manager.join()
,它会立即显示您的错误消息。
from multiprocessing import Process, Manager, Queue
import schedule
import time
def checkBirthdays(accountQueue):
print('[CheckBirthdays] Initated')
acc = {
'email': 'demo@test.com'
}
accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
accountProcessing.append(acc)
accountQueue.put(accountProcessing) # ****
def createSchedule(accountQueue):
# similar to cron, executes at a certain time
schedule.every(1).seconds.do(checkBirthdays, accountQueue)
while True:
schedule.run_pending()
# check every 60 seconds
time.sleep(1)
def main():
# FileNotFoundError: [Errno 2] No such file or directory
manager = Manager()
accountQueue = manager.Queue()
# RuntimeError: Queue objects should only be shared between processes through inheritance
# accountQueue = Queue()
schedule = Process(target=createSchedule, args=(accountQueue,)).start()
manager.join() # <--------------------------------
if __name__ == '__main__':
main()
这是因为在这种情况下有一个内部机制通过 Socket 处理进程间通信。如果一个主人或它的一个奴隶被杀死,通信隧道也会关闭,因此它会说最后没有这样的“文件”,套接字在基于 Unix 的系统上仍然是一种(某种)文件。
manager.join()
真正做的是检查主控(您的 manager
)是否可以丢弃从属进程,因为它们不再有用(最后执行的程序行、崩溃等)或者是否将此操作“重新安排”到未来并保持 运行.
类似这样的高级代码草图:
import time
while True: # executed in the master process
for slave in get_all_slaves():
state = discard_if_posible(slave)
# do stuff with the final state from the slave
time.sleep(0.5)
为了更深入,Manager._process
是 nothing more than a Process
instance,这里是 join()
:
最后,考虑为 类 使用上下文管理器以防止 shutdown/cleanup 问题也发生。 with
、__enter__()
和 __exit__()
是有原因的,尤其是在多处理代码库中。
编辑:代码运行良好。要么您没有提供重现您的问题的最小工作示例,要么您没有通过建议的答案修复它。
/tmp$ mkdir mpanswer && cd mpanswer
/tmp/mpanswer$ cat << EOF > main.py
> from multiprocessing import Process, Manager, Queue
> import schedule
> import time
>
> def checkBirthdays(accountQueue):
> print('[CheckBirthdays] Initated')
> acc = {
> 'email': 'demo@test.com'
> }
> accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
> accountProcessing.append(acc)
> accountQueue.put(accountProcessing) # ****
>
> def createSchedule(accountQueue):
> # similar to cron, executes at a certain time
> schedule.every(1).seconds.do(checkBirthdays, accountQueue)
> while True:
> schedule.run_pending()
> # check every 60 seconds
> time.sleep(1)
>
> def main():
> # FileNotFoundError: [Errno 2] No such file or directory
> manager = Manager()
> accountQueue = manager.Queue()
>
> # RuntimeError: Queue objects should only be shared between processes through inheritance
> # accountQueue = Queue()
> schedule = Process(target=createSchedule, args=(accountQueue,)).start()
> manager.join() # <--------------------------------
>
> if __name__ == '__main__':
> main()
> EOF
/tmp/mpanswer$ cat << EOF > Dockerfile
> FROM python:alpine
> COPY main.py main.py
> RUN pip install schedule
> CMD python main.py
> EOF
/tmp/mpanswer$ docker build --no-cache --tag mpanswer . && docker run --rm -it mpanswer
Sending build context to Docker daemon 4.096kB
Step 1/4 : FROM python:alpine
---> 03c59395ddea
Step 2/4 : COPY main.py main.py
---> 4ebcad402bf1
Step 3/4 : RUN pip install schedule
---> Running in 3b4dcc189d48
Collecting schedule
Downloading schedule-1.1.0-py2.py3-none-any.whl (10 kB)
Installing collected packages: schedule
Successfully installed schedule-1.1.0
WARNING: You are using pip version 21.0; however, version 21.2.4 is available.
You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.
Removing intermediate container 3b4dcc189d48
---> 039c1a333a5d
Step 4/4 : CMD python main.py
---> Running in 9dacf9084f28
Removing intermediate container 9dacf9084f28
---> de1391085794
Successfully built de1391085794
Successfully tagged mpanswer:latest
[CheckBirthdays] Initated
[CheckBirthdays] Initated
[CheckBirthdays] Initated
[CheckBirthdays] Initated
^C
/tmp/mpanswer$
from multiprocessing import Process, Manager, Queue
import schedule
import time
def checkBirthdays(accountQueue):
print('[CheckBirthdays] Initated')
acc = {
'email': 'demo@test.com'
}
accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
accountProcessing.append(acc)
accountQueue.put(accountProcessing) # ****
def createSchedule(accountQueue):
# similar to cron, executes at a certain time
schedule.every().day.at("23:51").do(checkBirthdays, accountQueue)
while True:
schedule.run_pending()
# check every 60 seconds
time.sleep(60)
def main():
# FileNotFoundError: [Errno 2] No such file or directory
manager = Manager()
accountQueue = manager.Queue()
# RuntimeError: Queue objects should only be shared between processes through inheritance
# accountQueue = Queue()
schedule = Process(target=createSchedule, args=(accountQueue,)).start()
if __name__ == '__main__':
main()
Manager().Queue() 给出此错误 FileNotFoundError: [Errno 2] No such file or directory
。当我收到 FileNotFound 错误时我不确定,它使用 Queue()
Queue() 在 ****
标记处给出此错误 RuntimeError: Queue objects should only be shared between processes through inheritance
[CheckBirthdays] Initated
Process Process-2:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 827, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
self.run()
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/tehpirate/Documents/Kelloggs/test.py", line 18, in createSchedule
schedule.run_pending()
File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 780, in run_pending
default_scheduler.run_pending()
File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 100, in run_pending
self._run_job(job)
File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 172, in _run_job
ret = job.run()
File "/home/tehpirate/.local/lib/python3.8/site-packages/schedule/__init__.py", line 661, in run
ret = self.job_func()
File "/home/tehpirate/Documents/Kelloggs/test.py", line 10, in checkBirthdays
accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
File "<string>", line 2, in empty
File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 831, in _callmethod
self._connect()
File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 818, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 502, in Client
c = SocketClient(address)
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 629, in SocketClient
s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory
我只添加了 Manager.join()
(显然调整了秒数),如文档中所示,以保持主机 运行 和从机处于活动状态。如果我删除该行 manager.join()
,它会立即显示您的错误消息。
from multiprocessing import Process, Manager, Queue
import schedule
import time
def checkBirthdays(accountQueue):
print('[CheckBirthdays] Initated')
acc = {
'email': 'demo@test.com'
}
accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
accountProcessing.append(acc)
accountQueue.put(accountProcessing) # ****
def createSchedule(accountQueue):
# similar to cron, executes at a certain time
schedule.every(1).seconds.do(checkBirthdays, accountQueue)
while True:
schedule.run_pending()
# check every 60 seconds
time.sleep(1)
def main():
# FileNotFoundError: [Errno 2] No such file or directory
manager = Manager()
accountQueue = manager.Queue()
# RuntimeError: Queue objects should only be shared between processes through inheritance
# accountQueue = Queue()
schedule = Process(target=createSchedule, args=(accountQueue,)).start()
manager.join() # <--------------------------------
if __name__ == '__main__':
main()
这是因为在这种情况下有一个内部机制通过 Socket 处理进程间通信。如果一个主人或它的一个奴隶被杀死,通信隧道也会关闭,因此它会说最后没有这样的“文件”,套接字在基于 Unix 的系统上仍然是一种(某种)文件。
manager.join()
真正做的是检查主控(您的 manager
)是否可以丢弃从属进程,因为它们不再有用(最后执行的程序行、崩溃等)或者是否将此操作“重新安排”到未来并保持 运行.
类似这样的高级代码草图:
import time
while True: # executed in the master process
for slave in get_all_slaves():
state = discard_if_posible(slave)
# do stuff with the final state from the slave
time.sleep(0.5)
为了更深入,Manager._process
是 nothing more than a Process
instance,这里是 join()
:
最后,考虑为 类 使用上下文管理器以防止 shutdown/cleanup 问题也发生。 with
、__enter__()
和 __exit__()
是有原因的,尤其是在多处理代码库中。
编辑:代码运行良好。要么您没有提供重现您的问题的最小工作示例,要么您没有通过建议的答案修复它。
/tmp$ mkdir mpanswer && cd mpanswer
/tmp/mpanswer$ cat << EOF > main.py
> from multiprocessing import Process, Manager, Queue
> import schedule
> import time
>
> def checkBirthdays(accountQueue):
> print('[CheckBirthdays] Initated')
> acc = {
> 'email': 'demo@test.com'
> }
> accountProcessing = [] if accountQueue.empty() else list(accountQueue.get())
> accountProcessing.append(acc)
> accountQueue.put(accountProcessing) # ****
>
> def createSchedule(accountQueue):
> # similar to cron, executes at a certain time
> schedule.every(1).seconds.do(checkBirthdays, accountQueue)
> while True:
> schedule.run_pending()
> # check every 60 seconds
> time.sleep(1)
>
> def main():
> # FileNotFoundError: [Errno 2] No such file or directory
> manager = Manager()
> accountQueue = manager.Queue()
>
> # RuntimeError: Queue objects should only be shared between processes through inheritance
> # accountQueue = Queue()
> schedule = Process(target=createSchedule, args=(accountQueue,)).start()
> manager.join() # <--------------------------------
>
> if __name__ == '__main__':
> main()
> EOF
/tmp/mpanswer$ cat << EOF > Dockerfile
> FROM python:alpine
> COPY main.py main.py
> RUN pip install schedule
> CMD python main.py
> EOF
/tmp/mpanswer$ docker build --no-cache --tag mpanswer . && docker run --rm -it mpanswer
Sending build context to Docker daemon 4.096kB
Step 1/4 : FROM python:alpine
---> 03c59395ddea
Step 2/4 : COPY main.py main.py
---> 4ebcad402bf1
Step 3/4 : RUN pip install schedule
---> Running in 3b4dcc189d48
Collecting schedule
Downloading schedule-1.1.0-py2.py3-none-any.whl (10 kB)
Installing collected packages: schedule
Successfully installed schedule-1.1.0
WARNING: You are using pip version 21.0; however, version 21.2.4 is available.
You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.
Removing intermediate container 3b4dcc189d48
---> 039c1a333a5d
Step 4/4 : CMD python main.py
---> Running in 9dacf9084f28
Removing intermediate container 9dacf9084f28
---> de1391085794
Successfully built de1391085794
Successfully tagged mpanswer:latest
[CheckBirthdays] Initated
[CheckBirthdays] Initated
[CheckBirthdays] Initated
[CheckBirthdays] Initated
^C
/tmp/mpanswer$