php-amqplib 和 RabbitMQ 的死字?

Dead lettering with php-amqplib and RabbitMQ?

我刚刚开始使用 php-amqplib 和 RabbitMQ,并且想要一种方法来处理由于某种原因无法处理且被拒绝的消息。我认为人们处理这个问题的一种方式是使用死信队列。我正在尝试设置它,但到目前为止还没有成功,希望有人可以提供一些建议。

我的队列初始化看起来有点像:

class BaseAbstract
{
    /** @var AMQPStreamConnection */
    protected $connection;
    /** @var AMQPChannel */
    protected $channel;
    /** @var array */
    protected $deadLetter = [
        'exchange' => 'dead_letter',
        'type' => 'direct',
        'queue' => 'delay_queue',
        'ttl' => 10000 // in milliseconds
    ];

    protected function initConnection(array $config)
    {
        try {
            $this->connection = AMQPStreamConnection::create_connection($config);
            $this->channel = $this->connection->channel();

            // Setup dead letter exchange and queue
            $this->channel->exchange_declare($this->deadLetter['exchange'], $this->deadLetter['type'], false, true, false);
            $this->channel->queue_declare($this->deadLetter['queue'], false, true, false, false, false, new AMQPTable([
                'x-dead-letter-exchange' => $this->deadLetter['exchange'],
                'x-dead-letter-routing-key' => $this->deadLetter['queue'],
                'x-message-ttl' => $this->deadLetter['ttl']
            ]));
            $this->channel->queue_bind($this->deadLetter['queue'], $this->deadLetter['exchange']);

            // Set up regular exchange and queue
            $this->channel->exchange_declare($this->getExchangeName(), $this->getExchangeType(), true, true, false);
            $this->channel->queue_declare($this->getQueueName(), true, true, false, false, new AMQPTable([
                'x-dead-letter-exchange' => $this->deadLetter['exchange'],
                'x-dead-letter-routing-key' => $this->deadLetter['queue']
            ]));

            if (method_exists($this, 'getRouteKey')) {
                $this->channel->queue_bind($this->getQueueName(), $this->getExchangeName(), $this->getRouteKey());
            } else {
                $this->channel->queue_bind($this->getQueueName(), $this->getExchangeName());
            }
        } catch (\Exception $e) {
            throw new \RuntimeException('Cannot connect to the RabbitMQ service: ' . $e->getMessage());
        }
        return $this;
    }

    // ...
}

我认为应该设置我的死信交换和队列,然后还设置我的常规交换和队列(使用扩展 类 提供的 getRouteKey、getQueueName 和 getExchangeName/Type 方法)

当我尝试处理如下消息时:

public function process(AMQPMessage $message)
{
    $msg = json_decode($message->body);
    if (empty($msg->payload) || empty($msg->payload->run)) {
        $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, true);
        return;
    }

    // removed for post brevity, but compose $cmd variable

    exec($cmd, $output, $returned);
    if ($returned !== 0) {
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    } else {
        $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
    }
}

但是我找回了错误Something went wrong: Cannot connect to the RabbitMQ service: PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'delay_queue' in vhost '/': received 'dead_letter' but current is ''

这是我设置死字的方式吗?我在周围看到的不同示例似乎都显示出一些不同的处理方式,none 其中似乎对我有用。所以我显然误解了这里的一些东西,我很感激任何建议。 :)

设置(永久)队列和交换是您想要在部署代码时一次做的事情,而不是每次您想要使用它们时。将它们想象成您的数据库模式 - 虽然协议提供 "declare" 而不是 "create",但您通常应该编写 假设事物以特定方式配置的代码 。您可以将代码的第一部分构建到安装脚本中,或者使用 the web- and CLI-based management plugin 以简单的 JSON 格式管理这些代码。

您看到的错误可能是尝试在不同时间使用不同参数声明相同队列的结果 - "declare" 不会替换或重新配置现有队列,它将参数视为"pre-conditions"待查。您需要删除并重新创建队列,或通过管理 UI 对其进行管理,以更改其现有参数。

运行-time 声明在您想要动态 在您的代理中创建项目时变得更有用。你可以给他们起你知道的唯一的名字,或者传递 null 作为名字来接收随机生成的名字(人们有时指的是创建一个 "anonymous queue",但是每个队列中RabbitMQ 有一个名字,即使你没有选择它)。


如果我没看错的话,你的 "schema" 看起来像这样:

# Dead Letter eXchange and Queue
Exchange: DLX
Queue: DLQ; dead letter exchange: DLX, with key "DLQ"; automatic expiry
Binding: copy messages arriving in DLX to DLQ

# Regular eXchange and Queue
Exchange: RX
Queue: RQ; dead letter exchange: DLX, with key "DLQ"
Binding: copy messages from RX to RQ, optionally filtered by routing key

当消息在 RQ 中为 "nacked" 时,它将被传递给 DLX,其路由密钥被覆盖为 "DLQ"。然后它将被复制到 DLQ。如果它从 DLQ 中被拒绝,或者在那个队列中等待太久,它将被路由到它自己。

我会用两种方式简化:

  • 从 "dead letter queue"(我标记为 DLQ)中删除死信交换和 TTL;该循环可能比有用更令人困惑。
  • 从常规队列(我标记为 RQ)中删除 x-dead-letter-routing-key 选项。常规队列的配置不需要知道死信交换是否有零个、一个或多个队列附加到它,所以不应该知道另一个队列的名称。如果您希望 nacked 消息直接进入一个队列,只需将其设为 "fanout exchange"(忽略路由键)或绑定键设置为 #(这是一个通配符)的 "topic exchange"匹配所有路由键)。

另一种可能是将 x-dead-letter-routing-key 设置为 常规 队列的名称,即标记它来自哪个队列。但在您有一个用例之前,我会保持简单并使用其原始路由密钥留下消息。