使用鼠兔异步(扭曲)的更简单方法?

Easier way to use pika asynchronous (twisted)?

这是我使用 rabbitmq 的第一个项目,我完全迷失了,因为我不确定什么是解决问题的最佳方法。

该程序相当简单,它只是监听警报事件,然后将事件放入 rabbitmq 队列中,但我对程序的体系结构感到困惑。

如果我打开、发布然后关闭每个事件的连接,我会增加很多延迟,并且会传输不必要的包(甚至比平时更多,因为我使用的是 TLS)...

如果我保持连接打开,并创建一个发布消息的函数(我只使用一个队列,非常基本),我最终会遇到问题,因为多个事件可能同时发生,而且我的如果与 rabbitmq 代理的连接结束,程序将不知道要做什么。

阅读他们的文档,该解决方案似乎使用了他们的“连接适配器”之一,它非常适合我,因为我只是从基本套接字重写了所有连接内容以使用 Twisted(我真的很喜欢他们的高级方法).但有一个问题。对于几乎不认为自己是“中级”的人来说,他们的 "basic example" 相当复杂。

在一个完美的世界中,我将能够 运行 与“警报服务器”在同一个反应器中的服务并调用一个方法来发布消息。但是我很难理解代码。有没有和 pika 一起工作过的人能给我指出更好的方向,或者甚至告诉我是否有更简单的方法?

好吧,我会 post 对我有用的东西。可能不是最好的选择,但也许它可以帮助遇到同样问题的人。

首先,我决定放弃 Twisted 并使用 Asyncio(不是针对个人的,我只是想使用它,因为它已经在 python 中),甚至 pika 有一个使用异步的好例子,我尝试并发现使用 aio_pika.

更容易

我最终得到了 2 个主要函数。一个用于发布者,另一个用于订阅者。 下面是我的代码,适合我...

# -*- coding: utf-8 -*-

import asyncio
import aio_pika
from myapp import conf

QUEUE_SEND = []


def add_queue_send(msg):
    """Add MSG to QUEUE

    Args:
        msg (string): JSON
    """
    QUEUE_SEND.append(msg)


def build_url(amqp_user, amqp_pass, virtual_host):
    """Build Auth URL

    Args:
        amqp_user (str): User name
        amqp_pass (str): Password
        virtual_host (str): Virtual Host

    Returns:
        str: AMQP URL
    """
    return ''.join(['amqps://',
                    amqp_user, ':', amqp_pass,
                    '@', conf.get('amqp_host'), '/', virtual_host,
                    '?cafile=', conf.get('ca_cert'),
                    '&keyfile=', conf.get('client_key'),
                    '&certfile=', conf.get('client_cert'),
                    '&no_verify_ssl=0'])


async def process_message(message: aio_pika.IncomingMessage):
    """Read a new message

    Args:
        message (aio_pika.IncomingMessage): Mensagem
    """
    async with message.process():
        #   TODO: Do something with the new message
        await asyncio.sleep(1)


async def consumer(url):
    """Keep listening to a MQTT queue

    Args:
        url (str): URL

    Returns:
        aio_pika.Connection: Conn?
    """
    connection = await aio_pika.connect_robust(url=url)
    # Channel
    channel = await connection.channel()
    # Max concurrent messages?
    await channel.set_qos(prefetch_count=100)
    # Queue
    queue = await channel.declare_queue(conf.get('amqp_queue_client'))
    #   What call when a new message is received
    await queue.consume(process_message)
    #   Returns the connection?
    return connection


async def publisher(url):
    """Send messages from the queue.

    Args:
        url (str): URL de autenticação
    """
    connection = await aio_pika.connect_robust(url=url)
    # Channel
    channel = await connection.channel()
    while True:
        if QUEUE_SEND:
            #   If the list (my queue) is not empty
            msg = aio_pika.Message(body=QUEUE_SEND.pop().encode())
            await channel.default_exchange.publish(msg, routing_key='queue')
        else:
            #   Just wait
            await asyncio.sleep(1)
    await connection.close()

我开始都使用``loop.create_task```。

正如我所说。它对我有点用(即使我的代码的另一部分仍然有问题)但我不想让这个问题悬而未决,因为大多数人都会有同样的问题。

如果您知道更好的方法或更优雅的方法,请分享。