使用 kombu 时出现 ConcurrentObjectUseError

ConcurrentObjectUseError in gevent when using kombu

我在我的代码中使用了 kombu,有时我从 kombu 的生产者的发布方法中得到以下异常。我认为它在压力条件下重现得更多,所以可能是多线程问题,异常描述似乎也指出了这一点。

使用 python 2.7.18、kombu 4.6.11、amqp 2.6.1、gevent 20.6.2

如有任何帮助,我将不胜感激。谢谢!!

我也看到了这些可能相关的页面,尽管我无法从中推断出我应该做什么:

异常:

Traceback (most recent call last):
File "C:\Code\A\home_common\rabbitmq_common.py", line 168, in send_data_message
self.producer.publish(data_message, routing_key=destination, exchange=self.dataDirectExchange, headers={'source': source}, content_encoding='binary', content_type='application/octet-stream', retry=True)
File "C:\Python27\lib\site-packages\kombu\messaging.py", line 181, in publish
exchange_name, declare,
File "C:\Python27\lib\site-packages\kombu\connection.py", line 533, in _ensured
return fun(*args, **kwargs)
File "C:\Python27\lib\site-packages\kombu\messaging.py", line 203, in _publish
mandatory=mandatory, immediate=immediate,
File "C:\Python27\lib\site-packages\amqp\channel.py", line 1766, in _basic_publish
(0, exchange, routing_key, mandatory, immediate), msg
File "C:\Python27\lib\site-packages\amqp\abstract_channel.py", line 59, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "C:\Python27\lib\site-packages\amqp\method_framing.py", line 189, in write_frame
write(view[:offset])
File "C:\Python27\lib\site-packages\amqp\transport.py", line 305, in write
self._write(s)
File "C:\Python27\lib\site-packages\gevent_socket2.py", line 383, in sendall
return _socketcommon._sendall(self, data_memory, flags)
File "C:\Python27\lib\site-packages\gevent_socketcommon.py", line 392, in _sendall
timeleft = __send_chunk(socket, chunk, flags, timeleft, end)
File "C:\Python27\lib\site-packages\gevent_socketcommon.py", line 321, in __send_chunk
data_sent += socket.send(chunk, flags)
File "C:\Python27\lib\site-packages\gevent_socket2.py", line 369, in send
self._wait(self._write_event)
File "src\gevent
_hub_primitives.py", line 317, in gevent._gevent_c_hub_primitives.wait_on_socket
File "src\gevent
_hub_primitives.py", line 322, in gevent._gevent_c_hub_primitives.wait_on_socket
File "src\gevent
_hub_primitives.py", line 297, in gevent._gevent_c_hub_primitives._primitive_wait
ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0x0598E810>>

供将来参考 - 似乎已通过锁定所有可能联系兔子服务器的 kombu 调用解决了这个问题,例如:

with self._kombu_lock:
    self.producer.publish(data_message, routing_key=destination, exchange=self.dataDirectExchange, headers={'source': source}, content_encoding='binary', content_type='application/octet-stream', retry=True)