如何正确使用内存传输进行单元测试

How to properly use the in-memory transport for unit tests

我正在尝试编写一些测试,在这些测试中我向队列生成消息并查看消息是否在应用程序中被正确使用和处理。

为此,我正在研究 kombu 库,尤其是内存中的传输实现。

我仍然无法让它工作,生成的消息被消耗了。

因此,我的问题是,是否有人可以提供一个简单的单元测试来生成和使用内存中的消息

您需要根据您要测试的代码调整它,但您要查找的基本内容是内存控制器的 amqp URI,即 'memory://'。作为一个非常简单的例子:

conn = kombu.Connection("memory://")
queue = conn.SimpleQueue('myqueue')
queue.put('test')
msg = queue.get(timeout=1)
msg.ack()
print(msg.payload)