在发布之前确保存在 AMQP 交换绑定

Ensure that AMQP exchange binding exists before publishing

系统布局

我们有三个系统:

  1. 一个API端点(发布者和消费者)
  2. RabbitMQ 服务器
  3. 主要application/processor(发布者和消费者)

系统1和3都使用Laravel,与RabbitMQ交互使用PHPAMQPLIB

消息路径

系统 1(API 端点)向系统 3 的 RabbitMQ 服务器发送序列化作业以进行处理。然后它立即声明一个新的随机命名的队列,将一个交换器绑定到具有相关 ID 的该队列 - 并开始侦听消息。

与此同时,系统 3 完成了该作业,一旦它完成,就会将该作业的详细信息响应到 RabbitMQ,并在交换中使用相关 ID。

问题和我尝试过的方法

我经常发现这个过程失败了。发送和接收作业,发送响应 - 但系统 1 从未读取此响应,而且我没有看到它在 RabbitMQ 中发布。

我对此进行了一些广泛的调试,但没有找到根本原因。我目前的理论是系统 3 返回响应的速度如此之快,以至于系统 1 甚至还没有声明新的队列和交换绑定。这意味着系统 3 的响应无处可去,结果消失了.这个理论主要基于这样一个事实,即如果我将系统 3 上的作业设置为以较低的频率处理,系统就会变得更加可靠。作业处理得越快,就越不可靠。

问题是:我怎样才能防止这种情况发生?还是我还缺少其他东西?我当然希望在不破坏 Request/Response-pattern.

的情况下快速高效地处理这些作业

我已经记录了两个系统的输出 - 两个系统都使用相同的相关 ID,系统 3 在发布时得到一个 ACK​​ - 而系统 1 有一个声明的队列,没有消息最终会超时。

代码示例 1:发布消息

/**
 * Helper method to publish a message to RabbitMQ
 *
 * @param $exchange
 * @param $message
 * @param $correlation_id
 * @return bool
 */
public static function publishAMQPRouteMessage($exchange, $message, $correlation_id)
{
    try {
        $connection = new AMQPStreamConnection(
            env('RABBITMQ_HOST'),
            env('RABBITMQ_PORT'),
            env('RABBITMQ_LOGIN'),
            env('RABBITMQ_PASSWORD'),
            env('RABBITMQ_VHOST')
        );
        $channel = $connection->channel();

        $channel->set_ack_handler(function (AMQPMessage $message) {
            Log::info('[AMQPLib::publishAMQPRouteMessage()] - Message ACK');
        });

        $channel->set_nack_handler(function (AMQPMessage $message) {
            Log::error('[AMQPLib::publishAMQPRouteMessage()] - Message NACK');
        });

        $channel->confirm_select();

        $channel->exchange_declare(
            $exchange,
            'direct',
            false,
            false,
            false
        );

        $msg = new AMQPMessage($message);
        $channel->basic_publish($msg, $exchange, $correlation_id);

        $channel->wait_for_pending_acks();

        $channel->close();
        $connection->close();

        return true;
    } catch (Exception $e) {
        return false;
    }
}

代码示例 2:等待消息响应

/**
 * Helper method to fetch messages from RabbitMQ.
 *
 * @param $exchange
 * @param $correlation_id
 * @return mixed
 */
public static function readAMQPRouteMessage($exchange, $correlation_id)
{
    $connection = new AMQPStreamConnection(
        env('RABBITMQ_HOST'),
        env('RABBITMQ_PORT'),
        env('RABBITMQ_LOGIN'),
        env('RABBITMQ_PASSWORD'),
        env('RABBITMQ_VHOST')
    );
    $channel = $connection->channel();

    $channel->exchange_declare(
        $exchange,
        'direct',
        false,
        false,
        false
    );

    list($queue_name, ,) = $channel->queue_declare(
        '',
        false,
        false,
        true,
        false
    );

    $channel->queue_bind($queue_name, $exchange, $correlation_id);

    $callback = function ($msg) {
        return self::$rfcResponse = $msg->body;
    };

    $channel->basic_consume(
        $queue_name,
        '',
        false,
        true,
        false,
        false,
        $callback
    );

    if (!count($channel->callbacks)) {
        Log::error('[AMQPLib::readAMQPRouteMessage()] - No callbacks registered!');
    }

    while (self::$rfcResponse === null && count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();

    return self::$rfcResponse;
}

感谢您提供的任何建议!

我可能遗漏了一些东西,但是当我读到这篇文章时:

System 1 (the API Endpoint) sends a serialized job to the RabbitMQ Server for System 3 to process. It then immediately declares a new randomly named queue, binds an exchange to that queue with a correlation ID - and starts to listen for messages.

我第一个想到的是"why do you wait until the message is sent before declaring the return queue?"

事实上,我们在这里有一系列单独的步骤:

  1. 正在生成关联 ID
  2. 将包含该 ID 的消息发布到交易所以在其他地方处理
  3. 正在声明一个新队列来接收响应
  4. 使用关联 ID 将队列绑定到交换器
  5. 将回调绑定到新队列
  6. 等待回复

响应要到第 2 步之后才会出现,因此我们希望尽可能晚地进行。唯一不能在此之前的步骤是第 6 步,但在代码中将第 5 步和第 6 步放在一起可能很方便。所以我将代码重新排列为:

  1. 正在生成关联 ID
  2. 正在声明一个新队列来接收响应
  3. 使用关联 ID 将队列绑定到交换器
  4. 将包含相关 ID 的消息发布到交易所以在其他地方处理
  5. 将回调绑定到新队列
  6. 等待回复

这样,无论响应发布多快,它都会被第 2 步中声明的队列拾取,一旦绑定回调并开始等待,您就会处理它。

请注意,没有什么是 readAMQPRouteMessage 知道而 publishAMQPRouteMessage 不知道的,因此您可以在它们之间自由移动代码。当你想从响应队列中消费时,你所需要的只是它的名字,你可以将它保存到一个变量中并传递,或者你自己生成而不是让 RabbitMQ 命名它。例如,您可以根据它正在侦听的相关 ID 来命名它,这样您就可以随时通过简单的字符串操作来弄清楚它是什么,例如"job_response.{$correlation_id}"