如何在具有多处理的无限循环程序中发出请求
How to make requests while program in infinite loop with multiprocessing
我有两个功能需要同时 运行。 read_card
需要无限循环 运行 并等待新卡(实际上是 Nrf reader)和
将一些字符串添加到队列中,send_data
假设从队列中获取值并通过请求将它们发送到服务器 library.Everything 在我不使用多处理时工作。但我想我需要并发。
这是我的两个函数。
def read_card(reader, configs):
print("First started")
while True:
authorized_uid = reader.is_granted(reader.read())
print("Waiting for card")
#TODO:If not authorized in AccessList.txt look to the server
if authorized_uid is not None:
print(authorized_uid)
open_door()
check_model = CheckModel(configs.DeviceSerialNumber, authorized_uid)
message_helper.put_message(check_model)
def send_data(sender):
print("Second started")
while True:
message_model = message_helper.get_message()
if message_model is not None:
sender.send_message(message_model)
下面是我如何调用 main
def main():
download_settings()
create_folders()
settings = read_settings()
accessList = get_user_list(settings)
configure_scheduler(settings)
message_sender = MessageSender(client.check,client.bulk)
reader_process = multiprocessing.Process(name = "reader_loop", target = read_card, args=(Reader(accessList, entryLogger),configs,))
message_process = multiprocessing.Process(name = "message_loop", target = send_data, args=(message_sender,))
reader_process.start()
message_process.start()
if __name__ == '__main__':
main()
那些是为了调试。我从不同的 类.
打印了 put_message
和 send_message
def send_message(self,model):
print(model)
return self.checkClient.check(model)
def put_message(self, message):
print(message)
self.put_to_queue(self.queue, message)
self.put_to_db(message)
我希望在终端中看到一些对象名称,但我只看到下面。 reader 也不起作用。
First started
Second started
我哪里做错了?
使用Queue
在进程之间进行通信。然后,当您在 reader
中读取卡片时,创建一个新作业并将其推入队列,然后将此作业弹出到处理器中并发送请求。
这是一个概念证明:
from datetime import datetime
from multiprocessing import Process, Queue
from random import random
from time import sleep
import requests
def reader(q: Queue):
while True:
# create a job
job = {'date': datetime.now().isoformat(), 'number': random()}
q.put(job)
# use a proper logger instead of printing,
# otherwise you'll get mangled output!
print('Enqueued new job', job)
sleep(5)
def client(q: Queue):
while True:
# wait for a new job
job = q.get()
res = requests.post(url='https://httpbin.org/post',
data=job)
res.raise_for_status()
json = res.json()
print(json['form'])
if __name__ == '__main__':
q = Queue()
reader_proc = Process(name='reader', target=reader, args=(q,))
client_proc = Process(name='client', target=client, args=(q,))
procs = [reader_proc, client_proc]
for p in procs:
print(f'{p.name} started')
p.start()
for p in procs:
p.join()
打印:
reader started
client started
Enqueued new job {'date': '2019-07-01T15:51:53.100395', 'number': 0.7659293922700549}
{'date': '2019-07-01T15:51:53.100395', 'number': '0.7659293922700549'}
Enqueued new job {'date': '2019-07-01T15:51:58.116020', 'number': 0.14306347124900576}
{'date': '2019-07-01T15:51:58.116020', 'number': '0.14306347124900576'}
我有两个功能需要同时 运行。 read_card
需要无限循环 运行 并等待新卡(实际上是 Nrf reader)和
将一些字符串添加到队列中,send_data
假设从队列中获取值并通过请求将它们发送到服务器 library.Everything 在我不使用多处理时工作。但我想我需要并发。
这是我的两个函数。
def read_card(reader, configs):
print("First started")
while True:
authorized_uid = reader.is_granted(reader.read())
print("Waiting for card")
#TODO:If not authorized in AccessList.txt look to the server
if authorized_uid is not None:
print(authorized_uid)
open_door()
check_model = CheckModel(configs.DeviceSerialNumber, authorized_uid)
message_helper.put_message(check_model)
def send_data(sender):
print("Second started")
while True:
message_model = message_helper.get_message()
if message_model is not None:
sender.send_message(message_model)
下面是我如何调用 main
def main():
download_settings()
create_folders()
settings = read_settings()
accessList = get_user_list(settings)
configure_scheduler(settings)
message_sender = MessageSender(client.check,client.bulk)
reader_process = multiprocessing.Process(name = "reader_loop", target = read_card, args=(Reader(accessList, entryLogger),configs,))
message_process = multiprocessing.Process(name = "message_loop", target = send_data, args=(message_sender,))
reader_process.start()
message_process.start()
if __name__ == '__main__':
main()
那些是为了调试。我从不同的 类.
打印了put_message
和 send_message
def send_message(self,model):
print(model)
return self.checkClient.check(model)
def put_message(self, message):
print(message)
self.put_to_queue(self.queue, message)
self.put_to_db(message)
我希望在终端中看到一些对象名称,但我只看到下面。 reader 也不起作用。
First started Second started
我哪里做错了?
使用Queue
在进程之间进行通信。然后,当您在 reader
中读取卡片时,创建一个新作业并将其推入队列,然后将此作业弹出到处理器中并发送请求。
这是一个概念证明:
from datetime import datetime
from multiprocessing import Process, Queue
from random import random
from time import sleep
import requests
def reader(q: Queue):
while True:
# create a job
job = {'date': datetime.now().isoformat(), 'number': random()}
q.put(job)
# use a proper logger instead of printing,
# otherwise you'll get mangled output!
print('Enqueued new job', job)
sleep(5)
def client(q: Queue):
while True:
# wait for a new job
job = q.get()
res = requests.post(url='https://httpbin.org/post',
data=job)
res.raise_for_status()
json = res.json()
print(json['form'])
if __name__ == '__main__':
q = Queue()
reader_proc = Process(name='reader', target=reader, args=(q,))
client_proc = Process(name='client', target=client, args=(q,))
procs = [reader_proc, client_proc]
for p in procs:
print(f'{p.name} started')
p.start()
for p in procs:
p.join()
打印:
reader started
client started
Enqueued new job {'date': '2019-07-01T15:51:53.100395', 'number': 0.7659293922700549}
{'date': '2019-07-01T15:51:53.100395', 'number': '0.7659293922700549'}
Enqueued new job {'date': '2019-07-01T15:51:58.116020', 'number': 0.14306347124900576}
{'date': '2019-07-01T15:51:58.116020', 'number': '0.14306347124900576'}