Rabbitmq 远程调用 Pika
Rabbitmq remote call with Pika
我是 rabbitmq 的新手,正在尝试通过本教程 (https://www.rabbitmq.com/tutorials/tutorial-six-python.html) 弄清楚如何让客户端请求服务器提供有关内存和 CPU 利用率的信息。
所以客户端请求 CPU 和内存(我相信我需要两个队列)并且服务器响应这些值。
在这种情况下,是否可以使用 Python.
中的 Pika 库简单地创建 client.py
和 server.py
如果您还没有,我建议您遵循第一个 RabbitMQ tutorials。 RPC 示例建立在前面示例中涵盖的概念(直接队列、独占队列、确认等)的基础上。
教程中提出的 RPC 解决方案至少需要两个队列,具体取决于您要使用的客户端数量:
- 一个直接队列(
rpc_queue
),用于将请求从客户端发送到服务器。
- 每个客户端一个独占队列,用于接收响应。
request/response周期:
- 客户端向
rpc_queue
发送消息。每条消息包含一个 reply_to
属性,其中包含服务器应回复的客户端独占队列的名称,以及一个 correlation_id
属性,这只是一个唯一的 id用于跟踪请求。
- 服务器在
rpc_queue
上等待消息。当消息到达时,它准备响应,将 correlation_id
添加到新消息,并将其发送到 reply_to
消息中定义的队列 属性.
- 客户端在其独占队列中等待,直到找到最初生成的
correlation_id
的消息。
直接跳到您的问题,要做的第一件事是定义您要在响应中使用的消息格式。您可以使用 JSON、msgpack 或任何其他序列化库。例如,如果使用 JSON,一条消息可能如下所示:
{
"cpu": 1.2,
"memory": 0.3
}
然后,在您的 server.py
上:
def on_request(channel, method, props, body):
response = {'cpu': current_cpu_usage(),
'memory': current_memory_usage()}
properties = pika.BasicProperties(correlation_id=props.correlation_id)
channel.basic_publish(exchange='',
routing_key=props.reply_to,
properties=properties,
body=json.dumps(response))
channel.basic_ack(delivery_tag=method.delivery_tag)
# ...
在你的 client.py
上:
class ResponseTimeout(Exception): pass
class Client:
# similar constructor as `FibonacciRpcClient` from tutorial...
def on_response(self, channel, method, props, body):
if self.correlation_id == props.correlation_id:
self.response = json.loads(body.decode())
def call(self, timeout=2):
self.response = None
self.correlation_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.correlation_id),
body='')
start_time = time.time()
while self.response is None:
if (start_time + timeout) < time.time():
raise ResponseTimeout()
self.connection.process_data_events()
return self.response
如您所见,代码与原始代码几乎相同 FibonacciRpcClient
。主要区别是:
- 我们使用 JSON 作为消息的数据格式。
- 我们的客户端
call()
方法不需要 body
参数(没有任何东西可以发送到服务器)
- 我们会处理响应超时(如果服务器出现故障,或者它没有回复我们的消息)
不过,这里还有很多地方需要改进:
- 没有错误处理:例如,如果客户端"forgets"发送一个
reply_to
队列,我们的服务器会崩溃,并会在重启时再次崩溃(损坏的消息将无限重新排队只要我们的服务器不承认它)
- 我们不处理断开的连接(没有重新连接机制)
- ...
您也可以考虑用 publish/subscribe 模式替换 RPC 方法;这样,服务器每隔 X 时间间隔简单地广播其 CPU/memory 状态,一个或多个客户端接收更新。
我是 rabbitmq 的新手,正在尝试通过本教程 (https://www.rabbitmq.com/tutorials/tutorial-six-python.html) 弄清楚如何让客户端请求服务器提供有关内存和 CPU 利用率的信息。
所以客户端请求 CPU 和内存(我相信我需要两个队列)并且服务器响应这些值。
在这种情况下,是否可以使用 Python.
中的 Pika 库简单地创建client.py
和 server.py
如果您还没有,我建议您遵循第一个 RabbitMQ tutorials。 RPC 示例建立在前面示例中涵盖的概念(直接队列、独占队列、确认等)的基础上。
教程中提出的 RPC 解决方案至少需要两个队列,具体取决于您要使用的客户端数量:
- 一个直接队列(
rpc_queue
),用于将请求从客户端发送到服务器。 - 每个客户端一个独占队列,用于接收响应。
request/response周期:
- 客户端向
rpc_queue
发送消息。每条消息包含一个reply_to
属性,其中包含服务器应回复的客户端独占队列的名称,以及一个correlation_id
属性,这只是一个唯一的 id用于跟踪请求。 - 服务器在
rpc_queue
上等待消息。当消息到达时,它准备响应,将correlation_id
添加到新消息,并将其发送到reply_to
消息中定义的队列 属性. - 客户端在其独占队列中等待,直到找到最初生成的
correlation_id
的消息。
直接跳到您的问题,要做的第一件事是定义您要在响应中使用的消息格式。您可以使用 JSON、msgpack 或任何其他序列化库。例如,如果使用 JSON,一条消息可能如下所示:
{
"cpu": 1.2,
"memory": 0.3
}
然后,在您的 server.py
上:
def on_request(channel, method, props, body):
response = {'cpu': current_cpu_usage(),
'memory': current_memory_usage()}
properties = pika.BasicProperties(correlation_id=props.correlation_id)
channel.basic_publish(exchange='',
routing_key=props.reply_to,
properties=properties,
body=json.dumps(response))
channel.basic_ack(delivery_tag=method.delivery_tag)
# ...
在你的 client.py
上:
class ResponseTimeout(Exception): pass
class Client:
# similar constructor as `FibonacciRpcClient` from tutorial...
def on_response(self, channel, method, props, body):
if self.correlation_id == props.correlation_id:
self.response = json.loads(body.decode())
def call(self, timeout=2):
self.response = None
self.correlation_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.correlation_id),
body='')
start_time = time.time()
while self.response is None:
if (start_time + timeout) < time.time():
raise ResponseTimeout()
self.connection.process_data_events()
return self.response
如您所见,代码与原始代码几乎相同 FibonacciRpcClient
。主要区别是:
- 我们使用 JSON 作为消息的数据格式。
- 我们的客户端
call()
方法不需要body
参数(没有任何东西可以发送到服务器) - 我们会处理响应超时(如果服务器出现故障,或者它没有回复我们的消息)
不过,这里还有很多地方需要改进:
- 没有错误处理:例如,如果客户端"forgets"发送一个
reply_to
队列,我们的服务器会崩溃,并会在重启时再次崩溃(损坏的消息将无限重新排队只要我们的服务器不承认它) - 我们不处理断开的连接(没有重新连接机制)
- ...
您也可以考虑用 publish/subscribe 模式替换 RPC 方法;这样,服务器每隔 X 时间间隔简单地广播其 CPU/memory 状态,一个或多个客户端接收更新。