具有绑定键的 Symfony Messenger 队列 - 重试策略

Symfony messenger queues with binding key - retry strategy

我正在我工作的公司实现 Messenger。我发现路由密钥有问题。

我想将一条消息发送到两个队列。另外两个应用程序将处理这个队列。一切正常,但当处理程序抛出异常时我发现了问题。它将消息发送一个发送到两个重试队列,因为重试队列是通过绑定键匹配的,这对于这个队列是相同的。

最后重试 3 次,我的 dlqs 上有 16 条消息。你能帮我解决这个问题吗?是否可以基于队列而不是路由键创建重试策略?

我的配置如下:

messenger:
    failure_transport: failed
    default_bus: command.bus
    transports:
        async:
            dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
            options:
                retry_strategy:
                    max_retries: 3
                    delay: 1000
                    multiplier: 2
                    max_delay: 0
                exchange:
                    name: olimp
                    type: topic
                queues:
                    create_miniature_v1:
                        binding_keys:
                            - first
                    create_miniature_v2:
                        binding_keys:
                            - first
        failed:
            dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
            options:
                exchange:
                    name: olimp_dead
                    type: topic
                queues:
                    create_miniature_v1_dlq:
                        binding_keys:
                            - first
                    create_miniature_v2_dlq:
                        binding_keys:
                            - first

    routing:
        'Olimp\Messenger\TestEvent': async

    buses:
        command.bus:
            middleware:
                - Olimp\Shared\Application\Message\Middleware\EventDispatcher
                - doctrine_close_connection
                - doctrine_transaction

        event.bus:
            default_middleware: allow_no_handlers

        query.bus: ~

我用这样的戳记发送事件:

class MessengerTestCommand extends Command
{
    protected static $defaultName = 'app:messenger-test';
    private MessageBusInterface $bus;

    public function __construct(MessageBusInterface $bus)
    {
        $this->bus = $bus;

        parent::__construct();
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $io = new SymfonyStyle($input, $output);

        $this->bus->dispatch(
            new TestEvent(), [
                new AmqpStamp('first')
            ]
        );

        $io->success('Done');

        return 0;
    }
}

处理程序:

class TestEventHandler implements MessageHandlerInterface
{
    public function __invoke(TestEvent $event)
    {
        dump($event->id);

        throw new \Exception('Boom');
    }
}

我在兔子身上发现了什么:

现在我正在尝试这样的配置:

framework:
    messenger:
        failure_transport: failed
        default_bus: command.bus
        transports:
            async:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    retry_strategy:
                        max_retries: 3
                        delay: 1000
                        multiplier: 2
                        max_delay: 0
                    exchange:
                        name: olimp
                        type: topic
                    queues:
                        create_miniature_v1:
                            binding_keys:
                                - first
            async1:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    retry_strategy:
                        max_retries: 3
                        delay: 1000
                        multiplier: 2
                        max_delay: 0
                    exchange:
                        name: olimp
                        type: topic
                    queues:
                        create_miniature_v2:
                            binding_keys:
                                - first
            failed:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    exchange:
                        name: olimp_dead
                        type: topic
                    queues:
                        create_miniature_v1_dlq:
                            binding_keys:
                                - first
                        create_miniature_v2_dlq:
                            binding_keys:
                                - first

        routing:
            'Olimp\Messenger\TestEvent': [async, async1]

并使用两个 运行 控制台命令:

bin/console messenger:consume async
bin/console messenger:consume async1

但是效果一样。

好的,我自己找到了答案。

我创建了新的重试策略。我将 queue_name_pattern 更改为 %routing_key%_%delay% 并创建了我自己的 SendFailedMessageForRetryListener。为了重试信封,我添加了 stamp new AmqpStamp($envelope->last(AmqpReceivedStamp::class)->getQueueName()) 用于为延迟队列创建正确的路由密钥。因此,我没有根据交换名称创建队列,而是根据队列名称创建了队列。

还有两件事:

队列中的绑定键如下所示:

queues:
    create_miniature_v1:
        binding_keys:
            - create_miniature_v1
            - first
    create_miniature_v2:
        binding_keys:
            - create_miniature_v2
            - first

和失败的队列:

queues:
    create_miniature_v1_dlq:
        binding_keys:
            - create_miniature_v1
    create_miniature_v2_dlq:
        binding_keys:
            - create_miniature_v2