如何以异步方式执行 ZeroMQ PUSH/PULL 原型?
How to execute a ZeroMQ PUSH/PULL archetype in an asynchronous way?
我想在一个端口发起一个PULL
并且想从其他端口接收到我的PULL
端口。在 PULL
端口的情况下,它异步侦听,当它收到消息时,它只是在控制台中打印消息。因此,为此我在 Push
-class 中编写了一个方法,它将消息发送到 PULL
端口。
我的代码如下:
import random
import zmq
import time
import sys
import string
import asyncio
import zmq.asyncio
class Push():
def __init__(self, port, addr='localhost'):
self.port = port
self.addr = addr
self.ctx = zmq.Context()
self.scoket = self.ctx.socket(zmq.PUSH)
self.scoket.connect(f'tcp://{self.addr}:{selfa.port}')
def send(self):
chars = string.ascii_uppercase + string.ascii_lowercase
message = ''.join(random.choice(chars) for _ in range(4))
self.scoket.send(bytes(message, 'utf-8'))
print(f'sending: {message}')
class Pull():
def __init__(self, port, addr='*'):
self.port = port
self.addr = addr
self.ctx = zmq.Context()
self.socket = self.ctx.socket(zmq.PULL)
self.socket.bind(f'tcp://{self.addr}:{self.port}')
async def listen(self, listener):
while True:
string = await self.socket.recv()
listener(string)
if __name__ == '__main__':
push = Push('55501')
async def send():
while True:
await asyncio.sleep(5)
print('Sending...')
push.send()
pull = Pull('55501')
try:
asyncio.run(
pull.listen(print),
send(),
)
except KeyboardInterrupt:
print('exiting...')
exit()
上面的代码不是运行。代码在 listen 方法处停止。
#ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
#https://github.com/MrYsLab/pymata-express/
import asyncio
import zmq
import json
import zmq.asyncio as zmq_asyncio
from pymata_express.pymata_express import PymataExpress
class ConcurrentTasks:
def __init__(self, board):
self.loop = board.get_event_loop()
self.board = board
self.ctxsync = zmq.Context()
self.context = zmq.asyncio.Context()
self.rep = self.context.socket(zmq.REP)
self.rep.bind("tcp://*:5558")
self.trigger_pin = 53
self.echo_pin = 51
loop.run_until_complete(self.async_init_and_run())
### START: NEW CODE THAT RESOLVED THE ISSUE
async def pingsonar(self):
value = await self.board.sonar_read(self.trigger_pin)
return value
async def readsonar(self):
while True:
rep_recv = await self.rep.recv()
value = await asyncio.wait([self.pingsonar()])
valuesonar = list(value[0])[0].result()
json_data = json.dumps(valuesonar)
await self.rep.send(json_data.encode())
await asyncio.sleep(1 / 1000) #maybe this line isn't necessary
### END : NEW CODE THAT RESOLVED THE ISSUE
async def async_init_and_run(self):
await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)
readsonar = asyncio.create_task(self.readsonar())
await readsonar
# OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
my_board = PymataExpress()
try:
ConcurrentTasks(my_board)
except (KeyboardInterrupt, RuntimeError):
loop.run_until_complete(my_board.shutdown())
print('goodbye')
finally:
loop.close()
The above code is not running.
代码是运行,
但是源代码中有错误(概念上的)和拼写错误。
代码原样禁止 Push
-class 成为 .connect()
-ed,因此是 Pull
-对方 .bind()
-s,但没有人可以交谈。
解决方案
1 )
修复此拼写错误(+更好,明确检测并处理所有潜在的错误状态)
self.scoket.connect(f'tcp://{self.addr}:{selfa.port}') # this will NEVER FLY
#--------------------------------------------^
self.scoket.connect(f'tcp://{self.addr}:{self.port}') # this will ... ( + detect Error-state(s)
2 )
更正概念 - 混合唾手可得的成果(同时使用多个异步框架,一个与所有其他框架对抗)是一个标志对专业工程在可靠和足够稳健方面的责任的浅薄理解 distributed-computing(控制概念中的缺陷,例如在月球上着陆阿波罗 11 号期间,或者相反, 切尔诺贝利式的错误管理系统 + 心态, 是干净的, 鼓舞人心的和警告的足够的例子 ( 如果不禁止 + 检测 + 纠正 + 惩罚 ) 可以 & 会伤害一次又一次).
如果您从未使用过 ZeroMQ,在深入了解更多细节之前,您可能会喜欢先看看
最好的下一步
如果确实想达到专业水平,请从 Pieter Hintjens 的书开始 "Code Connected, Volume 1" - 值得花时间,值得努力,值得理解那里讨论的概念。
我想在一个端口发起一个PULL
并且想从其他端口接收到我的PULL
端口。在 PULL
端口的情况下,它异步侦听,当它收到消息时,它只是在控制台中打印消息。因此,为此我在 Push
-class 中编写了一个方法,它将消息发送到 PULL
端口。
我的代码如下:
import random
import zmq
import time
import sys
import string
import asyncio
import zmq.asyncio
class Push():
def __init__(self, port, addr='localhost'):
self.port = port
self.addr = addr
self.ctx = zmq.Context()
self.scoket = self.ctx.socket(zmq.PUSH)
self.scoket.connect(f'tcp://{self.addr}:{selfa.port}')
def send(self):
chars = string.ascii_uppercase + string.ascii_lowercase
message = ''.join(random.choice(chars) for _ in range(4))
self.scoket.send(bytes(message, 'utf-8'))
print(f'sending: {message}')
class Pull():
def __init__(self, port, addr='*'):
self.port = port
self.addr = addr
self.ctx = zmq.Context()
self.socket = self.ctx.socket(zmq.PULL)
self.socket.bind(f'tcp://{self.addr}:{self.port}')
async def listen(self, listener):
while True:
string = await self.socket.recv()
listener(string)
if __name__ == '__main__':
push = Push('55501')
async def send():
while True:
await asyncio.sleep(5)
print('Sending...')
push.send()
pull = Pull('55501')
try:
asyncio.run(
pull.listen(print),
send(),
)
except KeyboardInterrupt:
print('exiting...')
exit()
上面的代码不是运行。代码在 listen 方法处停止。
#ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
#https://github.com/MrYsLab/pymata-express/
import asyncio
import zmq
import json
import zmq.asyncio as zmq_asyncio
from pymata_express.pymata_express import PymataExpress
class ConcurrentTasks:
def __init__(self, board):
self.loop = board.get_event_loop()
self.board = board
self.ctxsync = zmq.Context()
self.context = zmq.asyncio.Context()
self.rep = self.context.socket(zmq.REP)
self.rep.bind("tcp://*:5558")
self.trigger_pin = 53
self.echo_pin = 51
loop.run_until_complete(self.async_init_and_run())
### START: NEW CODE THAT RESOLVED THE ISSUE
async def pingsonar(self):
value = await self.board.sonar_read(self.trigger_pin)
return value
async def readsonar(self):
while True:
rep_recv = await self.rep.recv()
value = await asyncio.wait([self.pingsonar()])
valuesonar = list(value[0])[0].result()
json_data = json.dumps(valuesonar)
await self.rep.send(json_data.encode())
await asyncio.sleep(1 / 1000) #maybe this line isn't necessary
### END : NEW CODE THAT RESOLVED THE ISSUE
async def async_init_and_run(self):
await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)
readsonar = asyncio.create_task(self.readsonar())
await readsonar
# OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
my_board = PymataExpress()
try:
ConcurrentTasks(my_board)
except (KeyboardInterrupt, RuntimeError):
loop.run_until_complete(my_board.shutdown())
print('goodbye')
finally:
loop.close()
The above code is not running.
代码是运行,
但是源代码中有错误(概念上的)和拼写错误。
代码原样禁止 Push
-class 成为 .connect()
-ed,因此是 Pull
-对方 .bind()
-s,但没有人可以交谈。
解决方案
1 )
修复此拼写错误(+更好,明确检测并处理所有潜在的错误状态)
self.scoket.connect(f'tcp://{self.addr}:{selfa.port}') # this will NEVER FLY
#--------------------------------------------^
self.scoket.connect(f'tcp://{self.addr}:{self.port}') # this will ... ( + detect Error-state(s)
2 )
更正概念 - 混合唾手可得的成果(同时使用多个异步框架,一个与所有其他框架对抗)是一个标志对专业工程在可靠和足够稳健方面的责任的浅薄理解 distributed-computing(控制概念中的缺陷,例如在月球上着陆阿波罗 11 号期间,或者相反, 切尔诺贝利式的错误管理系统 + 心态, 是干净的, 鼓舞人心的和警告的足够的例子 ( 如果不禁止 + 检测 + 纠正 + 惩罚 ) 可以 & 会伤害一次又一次).
如果您从未使用过 ZeroMQ,在深入了解更多细节之前,您可能会喜欢先看看
最好的下一步
如果确实想达到专业水平,请从 Pieter Hintjens 的书开始 "Code Connected, Volume 1" - 值得花时间,值得努力,值得理解那里讨论的概念。