线程中的 Pika 异步发布者

Pika asynchronous publisher in a thread

我关注了 pika 的 Asynchronous publisher example 并试图 运行 它的 self._connection.ioloop.start() 在一个单独的线程中。到目前为止,我设法为主线程使用队列来添加要发布的消息。但是让发布者线程从队列中获取消息的唯一方法并不令人满意。我用了类似

的东西
try:
    message = self._queue.get(True, 1)
    self._channel.basic_publish(body=message, exchange=self._exchange, routing_key='example.text')
except queue.Empty:
    pass
finally:
    self._connection.add_timeout(0.0001, self.publish_message)

一定有更好的方法来做到这一点,对吧?重要的是要注意我在 Windows 中使用 Python 3.6.4 并且 pika.SelectConnection 选择的 IO 循环似乎非常有限...

编辑:我刚刚发现如何使用 adapters.AsyncioConnection 而不是 SelectConnection。所以现在我可以用 self._connection.loop.call_soon(self.publish_message) 替换 self._connection.add_timeout(0.0001, self.publish_message)。 这会产生非常奇怪的结果:消息似乎每秒都会缓冲和发送一次。我是 Python 的新手,所以非常感谢您提供一些见解!

这个问题的正确答案,如果它仍然出现在搜索结果中,至少升级到 Pika v0.12 并利用方法 add_callback_threadsafe 可用于各种连接适配器。 Here 是一个例子。