在处理完整个数据包之前,不要让 stomp.py 从 ActiveMQ 中删除数据包

Don't let stomp.py delete packet from ActiveMQ untill the whole packet is processed

我正在尝试使用 stomp.py 从 activemq 获取消息,然后对其进行一些处理。但是有一种情况是某些消息的处理失败并且该消息丢失。

如何在消息完全处理之前防止删除消息?

例如,在我的代码中,当队列中有新条目时,将调用 on_message 函数并开始处理,但如果它在消息之间被中断丢失了。我该如何阻止它?

这是我的代码:

conn = stomp.Connection([(host, 61613)])
conn.set_listener('ML', MyListener())
conn.start()
conn.connect('admin', 'admin', wait=True)
conn.subscribe(destination=/queue/someque, id=1, ack='auto')
print "running"
while 1:
    print 'waiting'
    time.sleep(2.5)

这是我的 Listener class:

class MyListener(stomp.ConnectionListener):
    def on_message(self, headers, message):
        print headers
        print message
        do_something()

提前致谢。

问题似乎是您使用的是 'auto' ack 模式,因此消息将在代理传递给客户端之前得到确认,这意味着即使您未能处理它,它也会迟到经纪人方面已经忘记了。您需要使用 'client' ack 或 'client-individual' ack 模式,如 STOMP specification 中所述。使用其中一种客户端确认模式,您可以控制一条或多条消息何时被代理实际确认和丢弃。