RabbitMQ + kombu:write/read一次性使用随机名称的队列
RabbitMQ + kombu: write/read to one-time use queues with random names
我刚开始使用消息交换,在寻找适合该任务的手册时遇到了问题。
我需要组织队列池,这样:
生产者创建一些随机的空队列并在那里写入所有消息包(通常为 100 条消息)。
消费者找到非空非锁队列并从中读取直到
空了删了再找下一个
所以我的任务是将消息打包处理,我了解如何在一个队列中使用相同的键生成和使用消息,但找不到如何使用队列池。
我们可以并行地有多个生产者和消费者运行,但是他们中的哪一个发送给谁是无关紧要的。 我们不需要也永远不能link特定的生产者和特定的消费者。
一般任务:我们有很多客户端要接收推送通知,我们通过一些参数将推送分组以便稍后作为组处理,所以这样的组应该在一个队列中RabbitMQ作为一个组来生产和消费,但每个组都独立于其他组。
非常感谢 Hannu 的帮助: 他简单而强大的解决方案的关键思想是我们可以拥有一个已知名称的持久队列,生产者将在其中写入创建队列的名称,消费者将从那里读取这些名称。
为了使他的解决方案更具可读性和更容易工作 在我的个人任务中,我将 producer 中的 publish_data() 分为两个功能 - 一个 make随机队列并将其写入 control_queue 另一个接收此 random_queue 并用消息填充它。类似的想法对消费者有好处——一个函数处理队列,另一个函数将被调用来处理消息本身。
我做过类似的事情,但用的是 Pika。我不得不为示例清理和组合旧代码片段。它可能不是很复杂(这绝对是我使用它编写的第一个代码片段),但这就是我解决它的方法。基本上我会设置一个已知名称的控制队列。
发布者将为一组消息创建一个随机队列名称,将 N 条消息转储到其中(在我的例子中是编号 1-42),然后 post 将队列名称转储到控制队列。然后消费者收到这个队列名称,绑定到它,读取消息直到队列为空,然后删除队列。
这使事情变得相对简单,因为发布者不需要弄清楚他们可以在哪里发布他们的数据组(每个队列都是新的,具有随机名称)。接收者无需担心超时或 "all done" 消息,因为只有当一组数据已写入队列并且每条消息都在那里等待时,接收者才会收到队列名称。
也无需修补锁或信号或任何其他会使事情复杂化的东西。您可以拥有任意数量的消费者和生产者。当然,使用交换器和路由键,可能会有不同的消费者组来完成不同的任务等。
出版商
from kombu import Connection
import uuid
from time import sleep
def publish_data(conn):
random_name= "q" + str(uuid.uuid4()).replace("-", "")
random_queue = conn.SimpleQueue(random_name)
for i in xrange(0, 42):
random_queue.put(i)
random_queue.close()
return random_name
with Connection('amqp://guest:guest@localhost:5672//') as conn:
control_queue = conn.SimpleQueue('control_queue')
_a = 0
while True:
y_name = publish_data(conn)
message = y_name
control_queue.put(message)
print('Sent: {0}'.format(message))
_a += 1
sleep(0.3)
if _a > 20:
break
control_queue.close()
消费者
from Queue import Empty
from kombu import Connection, Queue
def process_msg(foo):
print str(foo)
with Connection("amqp://guest:guest@localhost:5672//") as _conn:
sub_queue = _conn.SimpleQueue(str(foo))
while True:
try:
_msg = sub_queue.get(block=False)
print _msg.payload
_msg.ack()
except Empty:
break
sub_queue.close()
chan = _conn.channel()
dq = Queue(name=str(foo), exchange="")
bdq = dq(chan)
bdq.delete()
with Connection('amqp://guest:guest@localhost:5672//') as conn:
rec = conn.SimpleQueue('control_queue')
while True:
msg = rec.get(block=True)
entry = msg.payload
msg.ack()
process_msg(entry)
我刚开始使用消息交换,在寻找适合该任务的手册时遇到了问题。
我需要组织队列池,这样:
生产者创建一些随机的空队列并在那里写入所有消息包(通常为 100 条消息)。
消费者找到非空非锁队列并从中读取直到 空了删了再找下一个
所以我的任务是将消息打包处理,我了解如何在一个队列中使用相同的键生成和使用消息,但找不到如何使用队列池。
我们可以并行地有多个生产者和消费者运行,但是他们中的哪一个发送给谁是无关紧要的。 我们不需要也永远不能link特定的生产者和特定的消费者。
一般任务:我们有很多客户端要接收推送通知,我们通过一些参数将推送分组以便稍后作为组处理,所以这样的组应该在一个队列中RabbitMQ作为一个组来生产和消费,但每个组都独立于其他组。
非常感谢 Hannu 的帮助: 他简单而强大的解决方案的关键思想是我们可以拥有一个已知名称的持久队列,生产者将在其中写入创建队列的名称,消费者将从那里读取这些名称。
为了使他的解决方案更具可读性和更容易工作 在我的个人任务中,我将 producer 中的 publish_data() 分为两个功能 - 一个 make随机队列并将其写入 control_queue 另一个接收此 random_queue 并用消息填充它。类似的想法对消费者有好处——一个函数处理队列,另一个函数将被调用来处理消息本身。
我做过类似的事情,但用的是 Pika。我不得不为示例清理和组合旧代码片段。它可能不是很复杂(这绝对是我使用它编写的第一个代码片段),但这就是我解决它的方法。基本上我会设置一个已知名称的控制队列。
发布者将为一组消息创建一个随机队列名称,将 N 条消息转储到其中(在我的例子中是编号 1-42),然后 post 将队列名称转储到控制队列。然后消费者收到这个队列名称,绑定到它,读取消息直到队列为空,然后删除队列。
这使事情变得相对简单,因为发布者不需要弄清楚他们可以在哪里发布他们的数据组(每个队列都是新的,具有随机名称)。接收者无需担心超时或 "all done" 消息,因为只有当一组数据已写入队列并且每条消息都在那里等待时,接收者才会收到队列名称。
也无需修补锁或信号或任何其他会使事情复杂化的东西。您可以拥有任意数量的消费者和生产者。当然,使用交换器和路由键,可能会有不同的消费者组来完成不同的任务等。
出版商
from kombu import Connection
import uuid
from time import sleep
def publish_data(conn):
random_name= "q" + str(uuid.uuid4()).replace("-", "")
random_queue = conn.SimpleQueue(random_name)
for i in xrange(0, 42):
random_queue.put(i)
random_queue.close()
return random_name
with Connection('amqp://guest:guest@localhost:5672//') as conn:
control_queue = conn.SimpleQueue('control_queue')
_a = 0
while True:
y_name = publish_data(conn)
message = y_name
control_queue.put(message)
print('Sent: {0}'.format(message))
_a += 1
sleep(0.3)
if _a > 20:
break
control_queue.close()
消费者
from Queue import Empty
from kombu import Connection, Queue
def process_msg(foo):
print str(foo)
with Connection("amqp://guest:guest@localhost:5672//") as _conn:
sub_queue = _conn.SimpleQueue(str(foo))
while True:
try:
_msg = sub_queue.get(block=False)
print _msg.payload
_msg.ack()
except Empty:
break
sub_queue.close()
chan = _conn.channel()
dq = Queue(name=str(foo), exchange="")
bdq = dq(chan)
bdq.delete()
with Connection('amqp://guest:guest@localhost:5672//') as conn:
rec = conn.SimpleQueue('control_queue')
while True:
msg = rec.get(block=True)
entry = msg.payload
msg.ack()
process_msg(entry)