用kombu为rabbitmq实现RPC客户端及其直接回复功能
Implementing RPC client with kombu for rabbitmq and it's direct reply-to feature
我正在用 kombu 实现一个 rpc 框架,以便与 rabbit 一起使用
我想使用 rabbitmq's direct reply-to feature 但我找不到用 kombu 实现它的方法。
客户端在生成消息之前需要在 'amq.rabbitmq.reply-to' 队列上消费,并且生产者和消费者应该使用相同的通道。
我还需要使用生产者池(或某种连接池),因为客户端是在线程环境中创建的。
到目前为止我有这段代码,rabbitmq 不会抱怨没有生产者出现前提条件错误(如果我删除消费者部分,它会抱怨),但生产者不会生产任何东西!
class KombuRpcClient(RpcClientBase):
def __init__(self, params):
self.future = Queue.Queue()
self.logger = logger
if isinstance(params, RpcConnectionProperties):
self.rpc_connection_properties = params
else:
self.rpc_connection_properties = RpcConnectionProperties(
host=params.get('host'),
port=5672,
username=params.get('username'),
password=params.get('password'),
vhost=params.get('vhost') if params.has_key('vhost') else '/'
)
self.amqp_url = self.rpc_connection_properties.get_kombu_transport_url('pyamqp')
self.reply_queue = KombuQueue('direct_reply', exchange=default_exchange, routing_key='amq.rabbitmq.reply-to')
def call(self, exchange, key, msg, no_response=False, timeout=5):
connection = Connection(self.amqp_url)
if exchange is not None:
key = exchange + ':' + key
with producers_pool[connection].acquire(block=True) as producer:
with producer.channel.Consumer(queues=[self.reply_queue], no_ack=True, callbacks=[self._on_message],
accept=['ujson']) as consumer:
producer.publish(
msg,
exchange=default_exchange,
routing_key=key,
immediate=True,
serializer='ujson', reply_to=self.reply_queue.routing_key)
consumer.consume()
pass
res = self.future.get(block=True, timeout=timeout)
print res
def cast(self, exchange, key, msg):
pass
def _on_message(self, body, message):
print body
self.future.put(body)
在 wireshark 的帮助下,我意识到有时 rabbitmq 会响应 PRECONDITION 错误,但 kombu 不会引发异常,我不知道为什么!
不管怎样,这段代码现在可以工作了:
class KombuRpcClient(RpcClientBase):
def __init__(self, params):
self.future = Queue.Queue()
self.logger = logger
if isinstance(params, RpcConnectionProperties):
self.rpc_connection_properties = params
else:
self.rpc_connection_properties = RpcConnectionProperties(
host=params.get('host'),
port=5672,
username=params.get('username'),
password=params.get('password'),
vhost=params.get('vhost') if params.has_key('vhost') else '/'
)
self.amqp_url = self.rpc_connection_properties.get_kombu_transport_url('pyamqp')
self.reply_queue = KombuQueue('amq.rabbitmq.reply-to', exchange=default_exchange, routing_key='amq.rabbitmq.reply-to')
def call(self, exchange, key, msg, no_response=False, timeout=5):
connection = Connection(self.amqp_url)
if exchange is not None:
key = exchange + ':' + key
with producers_pool[connection].acquire(block=True) as producer:
consumer = producer.channel.Consumer(queues=[self.reply_queue], no_ack=True, auto_declare=True,
callbacks=[self._on_message], accept=['ujson'])
consumer.consume(no_ack=True)
producer.publish(msg,
serializer='ujson',
exchange=default_exchange,
routing_key=key,
reply_to='amq.rabbitmq.reply-to')
consumer.connection.drain_events()
res = self.future.get(block=True, timeout=timeout)
response = Response()
response.body = res
return res
def cast(self, exchange, key, msg):
connection = Connection(self.amqp_url)
if exchange is not None:
key = exchange + ':' + key
with producers_pool[connection].acquire(block=True) as producer:
producer.publish(msg,
serializer='ujson',
exchange=default_exchange,
routing_key=key)
def _on_message(self, body, message):
print body
self.future.put(body)
顺便说一句:关于变量名,我很抱歉,这应该替换旧的 stomp rpc 客户端,所以我不得不保留名称以保持兼容性
我正在用 kombu 实现一个 rpc 框架,以便与 rabbit 一起使用 我想使用 rabbitmq's direct reply-to feature 但我找不到用 kombu 实现它的方法。
客户端在生成消息之前需要在 'amq.rabbitmq.reply-to' 队列上消费,并且生产者和消费者应该使用相同的通道。 我还需要使用生产者池(或某种连接池),因为客户端是在线程环境中创建的。
到目前为止我有这段代码,rabbitmq 不会抱怨没有生产者出现前提条件错误(如果我删除消费者部分,它会抱怨),但生产者不会生产任何东西!
class KombuRpcClient(RpcClientBase):
def __init__(self, params):
self.future = Queue.Queue()
self.logger = logger
if isinstance(params, RpcConnectionProperties):
self.rpc_connection_properties = params
else:
self.rpc_connection_properties = RpcConnectionProperties(
host=params.get('host'),
port=5672,
username=params.get('username'),
password=params.get('password'),
vhost=params.get('vhost') if params.has_key('vhost') else '/'
)
self.amqp_url = self.rpc_connection_properties.get_kombu_transport_url('pyamqp')
self.reply_queue = KombuQueue('direct_reply', exchange=default_exchange, routing_key='amq.rabbitmq.reply-to')
def call(self, exchange, key, msg, no_response=False, timeout=5):
connection = Connection(self.amqp_url)
if exchange is not None:
key = exchange + ':' + key
with producers_pool[connection].acquire(block=True) as producer:
with producer.channel.Consumer(queues=[self.reply_queue], no_ack=True, callbacks=[self._on_message],
accept=['ujson']) as consumer:
producer.publish(
msg,
exchange=default_exchange,
routing_key=key,
immediate=True,
serializer='ujson', reply_to=self.reply_queue.routing_key)
consumer.consume()
pass
res = self.future.get(block=True, timeout=timeout)
print res
def cast(self, exchange, key, msg):
pass
def _on_message(self, body, message):
print body
self.future.put(body)
在 wireshark 的帮助下,我意识到有时 rabbitmq 会响应 PRECONDITION 错误,但 kombu 不会引发异常,我不知道为什么! 不管怎样,这段代码现在可以工作了:
class KombuRpcClient(RpcClientBase):
def __init__(self, params):
self.future = Queue.Queue()
self.logger = logger
if isinstance(params, RpcConnectionProperties):
self.rpc_connection_properties = params
else:
self.rpc_connection_properties = RpcConnectionProperties(
host=params.get('host'),
port=5672,
username=params.get('username'),
password=params.get('password'),
vhost=params.get('vhost') if params.has_key('vhost') else '/'
)
self.amqp_url = self.rpc_connection_properties.get_kombu_transport_url('pyamqp')
self.reply_queue = KombuQueue('amq.rabbitmq.reply-to', exchange=default_exchange, routing_key='amq.rabbitmq.reply-to')
def call(self, exchange, key, msg, no_response=False, timeout=5):
connection = Connection(self.amqp_url)
if exchange is not None:
key = exchange + ':' + key
with producers_pool[connection].acquire(block=True) as producer:
consumer = producer.channel.Consumer(queues=[self.reply_queue], no_ack=True, auto_declare=True,
callbacks=[self._on_message], accept=['ujson'])
consumer.consume(no_ack=True)
producer.publish(msg,
serializer='ujson',
exchange=default_exchange,
routing_key=key,
reply_to='amq.rabbitmq.reply-to')
consumer.connection.drain_events()
res = self.future.get(block=True, timeout=timeout)
response = Response()
response.body = res
return res
def cast(self, exchange, key, msg):
connection = Connection(self.amqp_url)
if exchange is not None:
key = exchange + ':' + key
with producers_pool[connection].acquire(block=True) as producer:
producer.publish(msg,
serializer='ujson',
exchange=default_exchange,
routing_key=key)
def _on_message(self, body, message):
print body
self.future.put(body)
顺便说一句:关于变量名,我很抱歉,这应该替换旧的 stomp rpc 客户端,所以我不得不保留名称以保持兼容性