使用asyncio使zeromq客户端服务器并发

making zeromq client server concurrent using asyncio

我正在尝试在 python 中使用并发模型 (asyncio) 使用 zeroMQ

编写客户端服务器

我有下面的服务器代码 server.py

import zmq


class Server:

    def __init__(self, port_number):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.REP)
        server_url = "tcp://127.0.0.1:" + str(port_number)
        self.socket.bind(server_url)

    async def receive(self):
        while True:
            msg = await self.socket.recv()
            print("Got", msg)
            self.socket.send(msg)

我下面有客户端代码client.py下面

import zmq


class Client:
    def __init__(self):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.REQ)

    def connect(self):
        self.socket.connect("tcp://127.0.0.1:5000")
        self.socket.connect("tcp://127.0.0.1:6000")

    async def send_msg(self):
        for i in range(10):
            msg = "msg {0}".format(i)
            await self.socket.send(bytes(msg, 'utf-8'))
            print("Sending ", msg)
            await self.socket.recv()            

下面给出asyncio方式调用客户端和服务端的代码

import asyncio
import time
from server import Server
from client import Client
import logging


async def server(port_num):
    logging.info('Starting server on port 5000')
    s1 = Server(port_num)
    await s1.receive()


async def client():
    logging.info('Starting client')
    c1 = Client()
    c1.send_msg()


async def main_thread():
    await asyncio.gather(server(5000), server(6000), client())

if __name__ == '__main__':
    s = time.perf_counter()
    logging.info("Main    : before creating thread")

    asyncio.run(main_thread())

    logging.info("Main    : before running thread")
    elapsed = time.perf_counter() - s

    print(f'{__file__} finished executing in {elapsed:0.2f} seconds')

当我 运行 程序在函数 await s1.receive() 处无限等待

async def server(port_num):
    logging.info('Starting server on port 5000')
    s1 = Server(port_num)
    await s1.receive()

我做错了什么?

您需要导入并使用 zmq.asyncio 来创建 ZeroMQ 上下文。

import asyncio
import zmq
import zmq.asyncio

ctx = zmq.asyncio.Context()