使用鼠兔异步(扭曲)的更简单方法?
Easier way to use pika asynchronous (twisted)?
这是我使用 rabbitmq 的第一个项目,我完全迷失了,因为我不确定什么是解决问题的最佳方法。
该程序相当简单,它只是监听警报事件,然后将事件放入 rabbitmq 队列中,但我对程序的体系结构感到困惑。
如果我打开、发布然后关闭每个事件的连接,我会增加很多延迟,并且会传输不必要的包(甚至比平时更多,因为我使用的是 TLS)...
如果我保持连接打开,并创建一个发布消息的函数(我只使用一个队列,非常基本),我最终会遇到问题,因为多个事件可能同时发生,而且我的如果与 rabbitmq 代理的连接结束,程序将不知道要做什么。
阅读他们的文档,该解决方案似乎使用了他们的“连接适配器”之一,它非常适合我,因为我只是从基本套接字重写了所有连接内容以使用 Twisted(我真的很喜欢他们的高级方法).但有一个问题。对于几乎不认为自己是“中级”的人来说,他们的 "basic example" 相当复杂。
在一个完美的世界中,我将能够 运行 与“警报服务器”在同一个反应器中的服务并调用一个方法来发布消息。但是我很难理解代码。有没有和 pika 一起工作过的人能给我指出更好的方向,或者甚至告诉我是否有更简单的方法?
好吧,我会 post 对我有用的东西。可能不是最好的选择,但也许它可以帮助遇到同样问题的人。
首先,我决定放弃 Twisted 并使用 Asyncio(不是针对个人的,我只是想使用它,因为它已经在 python 中),甚至 pika 有一个使用异步的好例子,我尝试并发现使用 aio_pika.
更容易
我最终得到了 2 个主要函数。一个用于发布者,另一个用于订阅者。
下面是我的代码,适合我...
# -*- coding: utf-8 -*-
import asyncio
import aio_pika
from myapp import conf
QUEUE_SEND = []
def add_queue_send(msg):
"""Add MSG to QUEUE
Args:
msg (string): JSON
"""
QUEUE_SEND.append(msg)
def build_url(amqp_user, amqp_pass, virtual_host):
"""Build Auth URL
Args:
amqp_user (str): User name
amqp_pass (str): Password
virtual_host (str): Virtual Host
Returns:
str: AMQP URL
"""
return ''.join(['amqps://',
amqp_user, ':', amqp_pass,
'@', conf.get('amqp_host'), '/', virtual_host,
'?cafile=', conf.get('ca_cert'),
'&keyfile=', conf.get('client_key'),
'&certfile=', conf.get('client_cert'),
'&no_verify_ssl=0'])
async def process_message(message: aio_pika.IncomingMessage):
"""Read a new message
Args:
message (aio_pika.IncomingMessage): Mensagem
"""
async with message.process():
# TODO: Do something with the new message
await asyncio.sleep(1)
async def consumer(url):
"""Keep listening to a MQTT queue
Args:
url (str): URL
Returns:
aio_pika.Connection: Conn?
"""
connection = await aio_pika.connect_robust(url=url)
# Channel
channel = await connection.channel()
# Max concurrent messages?
await channel.set_qos(prefetch_count=100)
# Queue
queue = await channel.declare_queue(conf.get('amqp_queue_client'))
# What call when a new message is received
await queue.consume(process_message)
# Returns the connection?
return connection
async def publisher(url):
"""Send messages from the queue.
Args:
url (str): URL de autenticação
"""
connection = await aio_pika.connect_robust(url=url)
# Channel
channel = await connection.channel()
while True:
if QUEUE_SEND:
# If the list (my queue) is not empty
msg = aio_pika.Message(body=QUEUE_SEND.pop().encode())
await channel.default_exchange.publish(msg, routing_key='queue')
else:
# Just wait
await asyncio.sleep(1)
await connection.close()
我开始都使用``loop.create_task```。
正如我所说。它对我有点用(即使我的代码的另一部分仍然有问题)但我不想让这个问题悬而未决,因为大多数人都会有同样的问题。
如果您知道更好的方法或更优雅的方法,请分享。
这是我使用 rabbitmq 的第一个项目,我完全迷失了,因为我不确定什么是解决问题的最佳方法。
该程序相当简单,它只是监听警报事件,然后将事件放入 rabbitmq 队列中,但我对程序的体系结构感到困惑。
如果我打开、发布然后关闭每个事件的连接,我会增加很多延迟,并且会传输不必要的包(甚至比平时更多,因为我使用的是 TLS)...
如果我保持连接打开,并创建一个发布消息的函数(我只使用一个队列,非常基本),我最终会遇到问题,因为多个事件可能同时发生,而且我的如果与 rabbitmq 代理的连接结束,程序将不知道要做什么。
阅读他们的文档,该解决方案似乎使用了他们的“连接适配器”之一,它非常适合我,因为我只是从基本套接字重写了所有连接内容以使用 Twisted(我真的很喜欢他们的高级方法).但有一个问题。对于几乎不认为自己是“中级”的人来说,他们的 "basic example" 相当复杂。
在一个完美的世界中,我将能够 运行 与“警报服务器”在同一个反应器中的服务并调用一个方法来发布消息。但是我很难理解代码。有没有和 pika 一起工作过的人能给我指出更好的方向,或者甚至告诉我是否有更简单的方法?
好吧,我会 post 对我有用的东西。可能不是最好的选择,但也许它可以帮助遇到同样问题的人。
首先,我决定放弃 Twisted 并使用 Asyncio(不是针对个人的,我只是想使用它,因为它已经在 python 中),甚至 pika 有一个使用异步的好例子,我尝试并发现使用 aio_pika.
更容易我最终得到了 2 个主要函数。一个用于发布者,另一个用于订阅者。 下面是我的代码,适合我...
# -*- coding: utf-8 -*-
import asyncio
import aio_pika
from myapp import conf
QUEUE_SEND = []
def add_queue_send(msg):
"""Add MSG to QUEUE
Args:
msg (string): JSON
"""
QUEUE_SEND.append(msg)
def build_url(amqp_user, amqp_pass, virtual_host):
"""Build Auth URL
Args:
amqp_user (str): User name
amqp_pass (str): Password
virtual_host (str): Virtual Host
Returns:
str: AMQP URL
"""
return ''.join(['amqps://',
amqp_user, ':', amqp_pass,
'@', conf.get('amqp_host'), '/', virtual_host,
'?cafile=', conf.get('ca_cert'),
'&keyfile=', conf.get('client_key'),
'&certfile=', conf.get('client_cert'),
'&no_verify_ssl=0'])
async def process_message(message: aio_pika.IncomingMessage):
"""Read a new message
Args:
message (aio_pika.IncomingMessage): Mensagem
"""
async with message.process():
# TODO: Do something with the new message
await asyncio.sleep(1)
async def consumer(url):
"""Keep listening to a MQTT queue
Args:
url (str): URL
Returns:
aio_pika.Connection: Conn?
"""
connection = await aio_pika.connect_robust(url=url)
# Channel
channel = await connection.channel()
# Max concurrent messages?
await channel.set_qos(prefetch_count=100)
# Queue
queue = await channel.declare_queue(conf.get('amqp_queue_client'))
# What call when a new message is received
await queue.consume(process_message)
# Returns the connection?
return connection
async def publisher(url):
"""Send messages from the queue.
Args:
url (str): URL de autenticação
"""
connection = await aio_pika.connect_robust(url=url)
# Channel
channel = await connection.channel()
while True:
if QUEUE_SEND:
# If the list (my queue) is not empty
msg = aio_pika.Message(body=QUEUE_SEND.pop().encode())
await channel.default_exchange.publish(msg, routing_key='queue')
else:
# Just wait
await asyncio.sleep(1)
await connection.close()
我开始都使用``loop.create_task```。
正如我所说。它对我有点用(即使我的代码的另一部分仍然有问题)但我不想让这个问题悬而未决,因为大多数人都会有同样的问题。
如果您知道更好的方法或更优雅的方法,请分享。