在发布之前确保存在 AMQP 交换绑定
Ensure that AMQP exchange binding exists before publishing
系统布局
我们有三个系统:
- 一个API端点(发布者和消费者)
- RabbitMQ 服务器
- 主要application/processor(发布者和消费者)
系统1和3都使用Laravel,与RabbitMQ交互使用PHPAMQPLIB
消息路径
系统 1(API 端点)向系统 3 的 RabbitMQ 服务器发送序列化作业以进行处理。然后它立即声明一个新的随机命名的队列,将一个交换器绑定到具有相关 ID 的该队列 - 并开始侦听消息。
与此同时,系统 3 完成了该作业,一旦它完成,就会将该作业的详细信息响应到 RabbitMQ,并在交换中使用相关 ID。
问题和我尝试过的方法
我经常发现这个过程失败了。发送和接收作业,发送响应 - 但系统 1 从未读取此响应,而且我没有看到它在 RabbitMQ 中发布。
我对此进行了一些广泛的调试,但没有找到根本原因。我目前的理论是系统 3 返回响应的速度如此之快,以至于系统 1 甚至还没有声明新的队列和交换绑定。这意味着系统 3 的响应无处可去,结果消失了.这个理论主要基于这样一个事实,即如果我将系统 3 上的作业设置为以较低的频率处理,系统就会变得更加可靠。作业处理得越快,就越不可靠。
问题是:我怎样才能防止这种情况发生?还是我还缺少其他东西?我当然希望在不破坏 Request/Response-pattern.
的情况下快速高效地处理这些作业
我已经记录了两个系统的输出 - 两个系统都使用相同的相关 ID,系统 3 在发布时得到一个 ACK - 而系统 1 有一个声明的队列,没有消息最终会超时。
代码示例 1:发布消息
/**
* Helper method to publish a message to RabbitMQ
*
* @param $exchange
* @param $message
* @param $correlation_id
* @return bool
*/
public static function publishAMQPRouteMessage($exchange, $message, $correlation_id)
{
try {
$connection = new AMQPStreamConnection(
env('RABBITMQ_HOST'),
env('RABBITMQ_PORT'),
env('RABBITMQ_LOGIN'),
env('RABBITMQ_PASSWORD'),
env('RABBITMQ_VHOST')
);
$channel = $connection->channel();
$channel->set_ack_handler(function (AMQPMessage $message) {
Log::info('[AMQPLib::publishAMQPRouteMessage()] - Message ACK');
});
$channel->set_nack_handler(function (AMQPMessage $message) {
Log::error('[AMQPLib::publishAMQPRouteMessage()] - Message NACK');
});
$channel->confirm_select();
$channel->exchange_declare(
$exchange,
'direct',
false,
false,
false
);
$msg = new AMQPMessage($message);
$channel->basic_publish($msg, $exchange, $correlation_id);
$channel->wait_for_pending_acks();
$channel->close();
$connection->close();
return true;
} catch (Exception $e) {
return false;
}
}
代码示例 2:等待消息响应
/**
* Helper method to fetch messages from RabbitMQ.
*
* @param $exchange
* @param $correlation_id
* @return mixed
*/
public static function readAMQPRouteMessage($exchange, $correlation_id)
{
$connection = new AMQPStreamConnection(
env('RABBITMQ_HOST'),
env('RABBITMQ_PORT'),
env('RABBITMQ_LOGIN'),
env('RABBITMQ_PASSWORD'),
env('RABBITMQ_VHOST')
);
$channel = $connection->channel();
$channel->exchange_declare(
$exchange,
'direct',
false,
false,
false
);
list($queue_name, ,) = $channel->queue_declare(
'',
false,
false,
true,
false
);
$channel->queue_bind($queue_name, $exchange, $correlation_id);
$callback = function ($msg) {
return self::$rfcResponse = $msg->body;
};
$channel->basic_consume(
$queue_name,
'',
false,
true,
false,
false,
$callback
);
if (!count($channel->callbacks)) {
Log::error('[AMQPLib::readAMQPRouteMessage()] - No callbacks registered!');
}
while (self::$rfcResponse === null && count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
return self::$rfcResponse;
}
感谢您提供的任何建议!
我可能遗漏了一些东西,但是当我读到这篇文章时:
System 1 (the API Endpoint) sends a serialized job to the RabbitMQ Server for System 3 to process. It then immediately declares a new randomly named queue, binds an exchange to that queue with a correlation ID - and starts to listen for messages.
我第一个想到的是"why do you wait until the message is sent before declaring the return queue?"
事实上,我们在这里有一系列单独的步骤:
- 正在生成关联 ID
- 将包含该 ID 的消息发布到交易所以在其他地方处理
- 正在声明一个新队列来接收响应
- 使用关联 ID 将队列绑定到交换器
- 将回调绑定到新队列
- 等待回复
响应要到第 2 步之后才会出现,因此我们希望尽可能晚地进行。唯一不能在此之前的步骤是第 6 步,但在代码中将第 5 步和第 6 步放在一起可能很方便。所以我将代码重新排列为:
- 正在生成关联 ID
- 正在声明一个新队列来接收响应
- 使用关联 ID 将队列绑定到交换器
- 将包含相关 ID 的消息发布到交易所以在其他地方处理
- 将回调绑定到新队列
- 等待回复
这样,无论响应发布多快,它都会被第 2 步中声明的队列拾取,一旦绑定回调并开始等待,您就会处理它。
请注意,没有什么是 readAMQPRouteMessage
知道而 publishAMQPRouteMessage
不知道的,因此您可以在它们之间自由移动代码。当你想从响应队列中消费时,你所需要的只是它的名字,你可以将它保存到一个变量中并传递,或者你自己生成而不是让 RabbitMQ 命名它。例如,您可以根据它正在侦听的相关 ID 来命名它,这样您就可以随时通过简单的字符串操作来弄清楚它是什么,例如"job_response.{$correlation_id}"
系统布局
我们有三个系统:
- 一个API端点(发布者和消费者)
- RabbitMQ 服务器
- 主要application/processor(发布者和消费者)
系统1和3都使用Laravel,与RabbitMQ交互使用PHPAMQPLIB
消息路径
系统 1(API 端点)向系统 3 的 RabbitMQ 服务器发送序列化作业以进行处理。然后它立即声明一个新的随机命名的队列,将一个交换器绑定到具有相关 ID 的该队列 - 并开始侦听消息。
与此同时,系统 3 完成了该作业,一旦它完成,就会将该作业的详细信息响应到 RabbitMQ,并在交换中使用相关 ID。
问题和我尝试过的方法
我经常发现这个过程失败了。发送和接收作业,发送响应 - 但系统 1 从未读取此响应,而且我没有看到它在 RabbitMQ 中发布。
我对此进行了一些广泛的调试,但没有找到根本原因。我目前的理论是系统 3 返回响应的速度如此之快,以至于系统 1 甚至还没有声明新的队列和交换绑定。这意味着系统 3 的响应无处可去,结果消失了.这个理论主要基于这样一个事实,即如果我将系统 3 上的作业设置为以较低的频率处理,系统就会变得更加可靠。作业处理得越快,就越不可靠。
问题是:我怎样才能防止这种情况发生?还是我还缺少其他东西?我当然希望在不破坏 Request/Response-pattern.
的情况下快速高效地处理这些作业我已经记录了两个系统的输出 - 两个系统都使用相同的相关 ID,系统 3 在发布时得到一个 ACK - 而系统 1 有一个声明的队列,没有消息最终会超时。
代码示例 1:发布消息
/**
* Helper method to publish a message to RabbitMQ
*
* @param $exchange
* @param $message
* @param $correlation_id
* @return bool
*/
public static function publishAMQPRouteMessage($exchange, $message, $correlation_id)
{
try {
$connection = new AMQPStreamConnection(
env('RABBITMQ_HOST'),
env('RABBITMQ_PORT'),
env('RABBITMQ_LOGIN'),
env('RABBITMQ_PASSWORD'),
env('RABBITMQ_VHOST')
);
$channel = $connection->channel();
$channel->set_ack_handler(function (AMQPMessage $message) {
Log::info('[AMQPLib::publishAMQPRouteMessage()] - Message ACK');
});
$channel->set_nack_handler(function (AMQPMessage $message) {
Log::error('[AMQPLib::publishAMQPRouteMessage()] - Message NACK');
});
$channel->confirm_select();
$channel->exchange_declare(
$exchange,
'direct',
false,
false,
false
);
$msg = new AMQPMessage($message);
$channel->basic_publish($msg, $exchange, $correlation_id);
$channel->wait_for_pending_acks();
$channel->close();
$connection->close();
return true;
} catch (Exception $e) {
return false;
}
}
代码示例 2:等待消息响应
/**
* Helper method to fetch messages from RabbitMQ.
*
* @param $exchange
* @param $correlation_id
* @return mixed
*/
public static function readAMQPRouteMessage($exchange, $correlation_id)
{
$connection = new AMQPStreamConnection(
env('RABBITMQ_HOST'),
env('RABBITMQ_PORT'),
env('RABBITMQ_LOGIN'),
env('RABBITMQ_PASSWORD'),
env('RABBITMQ_VHOST')
);
$channel = $connection->channel();
$channel->exchange_declare(
$exchange,
'direct',
false,
false,
false
);
list($queue_name, ,) = $channel->queue_declare(
'',
false,
false,
true,
false
);
$channel->queue_bind($queue_name, $exchange, $correlation_id);
$callback = function ($msg) {
return self::$rfcResponse = $msg->body;
};
$channel->basic_consume(
$queue_name,
'',
false,
true,
false,
false,
$callback
);
if (!count($channel->callbacks)) {
Log::error('[AMQPLib::readAMQPRouteMessage()] - No callbacks registered!');
}
while (self::$rfcResponse === null && count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
return self::$rfcResponse;
}
感谢您提供的任何建议!
我可能遗漏了一些东西,但是当我读到这篇文章时:
System 1 (the API Endpoint) sends a serialized job to the RabbitMQ Server for System 3 to process. It then immediately declares a new randomly named queue, binds an exchange to that queue with a correlation ID - and starts to listen for messages.
我第一个想到的是"why do you wait until the message is sent before declaring the return queue?"
事实上,我们在这里有一系列单独的步骤:
- 正在生成关联 ID
- 将包含该 ID 的消息发布到交易所以在其他地方处理
- 正在声明一个新队列来接收响应
- 使用关联 ID 将队列绑定到交换器
- 将回调绑定到新队列
- 等待回复
响应要到第 2 步之后才会出现,因此我们希望尽可能晚地进行。唯一不能在此之前的步骤是第 6 步,但在代码中将第 5 步和第 6 步放在一起可能很方便。所以我将代码重新排列为:
- 正在生成关联 ID
- 正在声明一个新队列来接收响应
- 使用关联 ID 将队列绑定到交换器
- 将包含相关 ID 的消息发布到交易所以在其他地方处理
- 将回调绑定到新队列
- 等待回复
这样,无论响应发布多快,它都会被第 2 步中声明的队列拾取,一旦绑定回调并开始等待,您就会处理它。
请注意,没有什么是 readAMQPRouteMessage
知道而 publishAMQPRouteMessage
不知道的,因此您可以在它们之间自由移动代码。当你想从响应队列中消费时,你所需要的只是它的名字,你可以将它保存到一个变量中并传递,或者你自己生成而不是让 RabbitMQ 命名它。例如,您可以根据它正在侦听的相关 ID 来命名它,这样您就可以随时通过简单的字符串操作来弄清楚它是什么,例如"job_response.{$correlation_id}"