如何在 pyzmq 和 asyncio 中使用 REQ 和 REP?
How to use REQ and REP in pyzmq with asyncio?
我正在尝试在 python3.5 中使用 pyzmq 和 asyncio 实现异步客户端和服务器。我使用了 zmq 提供的 asyncio 库。下面是我的客户端(requester.py)和服务器(responder.py)代码。我的要求是只使用 REQ 和 REP zmq 套接字来实现异步客户端-服务器。
requester.py:
import asyncio
import zmq
import zmq.asyncio
async def receive():
message = await socket.recv()
print("Received reply ", "[", message, "]")
return message
async def send(i):
print("Sending request ", i,"...")
request = "Hello:" + str(i)
await socket.send(request.encode('utf-8'))
print("sent:",i)
async def main_loop_num(i):
await send(i)
# Get the reply.
message = await receive()
print("Message :", message)
async def main():
await asyncio.gather(*(main_loop_num(i) for i in range(1,10)))
port = 5556
context = zmq.asyncio.Context.instance()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:%d" % port)
asyncio.get_event_loop().run_until_complete(asyncio.wait([main()]))
responder.py:
import asyncio
import zmq
import zmq.asyncio
async def receive():
message = await socket.recv()
print("Received message:", message)
await asyncio.sleep(10)
print("Sleep complete")
return message
async def main_loop():
while True:
message = await receive()
print("back to main loop")
await socket.send(("World from %d" % port).encode('utf-8'))
print("sent back")
port = 5556
context = zmq.asyncio.Context.instance()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:%d" % port)
asyncio.get_event_loop().run_until_complete(asyncio.wait([main_loop()]))
我得到的输出是:
requester.py:
Sending request 5 ...
sent: 5
Sending request 6 ...
Sending request 1 ...
Sending request 7 ...
Sending request 2 ...
Sending request 8 ...
Sending request 3 ...
Sending request 9 ...
Sending request 4 ...
responder.py:
Received message: b'Hello:5'
Sleep complete
back to main loop
sent back
从输出来看,我假设请求者发送了多个请求,但只有第一个请求到达了响应者。此外,响应者为第一个请求发送的响应甚至没有返回到请求者。为什么会这样?我在所有可能的地方都使用了异步方法,但 send() 和 recv() 方法仍然没有异步运行。是否可以在不使用路由器、经销商等任何其他套接字的情况下进行异步请求代表?
ZMQs REQ-REP 套接字期望一个请求的严格顺序 - 一个回复 - 一个请求 - 一个回复 - ...
您的 requester.py 并行启动所有 10 个请求:
await asyncio.gather(*(main_loop_num(i) for i in range(1,10)))
当发送第二个请求时 ZMQ 抱怨:
zmq.error.ZMQError: Operation cannot be accomplished in current state
尝试将您的主要功能更改为一次发送一个请求:
async def main():
for i in range(1, 10):
await main_loop_num(i)
如果您需要并行发送多个请求,那么您不能使用 REQ-REP 套接字对,例如 DEALER-REP 套接字对。
我正在尝试在 python3.5 中使用 pyzmq 和 asyncio 实现异步客户端和服务器。我使用了 zmq 提供的 asyncio 库。下面是我的客户端(requester.py)和服务器(responder.py)代码。我的要求是只使用 REQ 和 REP zmq 套接字来实现异步客户端-服务器。
requester.py:
import asyncio
import zmq
import zmq.asyncio
async def receive():
message = await socket.recv()
print("Received reply ", "[", message, "]")
return message
async def send(i):
print("Sending request ", i,"...")
request = "Hello:" + str(i)
await socket.send(request.encode('utf-8'))
print("sent:",i)
async def main_loop_num(i):
await send(i)
# Get the reply.
message = await receive()
print("Message :", message)
async def main():
await asyncio.gather(*(main_loop_num(i) for i in range(1,10)))
port = 5556
context = zmq.asyncio.Context.instance()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:%d" % port)
asyncio.get_event_loop().run_until_complete(asyncio.wait([main()]))
responder.py:
import asyncio
import zmq
import zmq.asyncio
async def receive():
message = await socket.recv()
print("Received message:", message)
await asyncio.sleep(10)
print("Sleep complete")
return message
async def main_loop():
while True:
message = await receive()
print("back to main loop")
await socket.send(("World from %d" % port).encode('utf-8'))
print("sent back")
port = 5556
context = zmq.asyncio.Context.instance()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:%d" % port)
asyncio.get_event_loop().run_until_complete(asyncio.wait([main_loop()]))
我得到的输出是:
requester.py:
Sending request 5 ...
sent: 5
Sending request 6 ...
Sending request 1 ...
Sending request 7 ...
Sending request 2 ...
Sending request 8 ...
Sending request 3 ...
Sending request 9 ...
Sending request 4 ...
responder.py:
Received message: b'Hello:5'
Sleep complete
back to main loop
sent back
从输出来看,我假设请求者发送了多个请求,但只有第一个请求到达了响应者。此外,响应者为第一个请求发送的响应甚至没有返回到请求者。为什么会这样?我在所有可能的地方都使用了异步方法,但 send() 和 recv() 方法仍然没有异步运行。是否可以在不使用路由器、经销商等任何其他套接字的情况下进行异步请求代表?
ZMQs REQ-REP 套接字期望一个请求的严格顺序 - 一个回复 - 一个请求 - 一个回复 - ...
您的 requester.py 并行启动所有 10 个请求:
await asyncio.gather(*(main_loop_num(i) for i in range(1,10)))
当发送第二个请求时 ZMQ 抱怨:
zmq.error.ZMQError: Operation cannot be accomplished in current state
尝试将您的主要功能更改为一次发送一个请求:
async def main():
for i in range(1, 10):
await main_loop_num(i)
如果您需要并行发送多个请求,那么您不能使用 REQ-REP 套接字对,例如 DEALER-REP 套接字对。