具有绑定键的 Symfony Messenger 队列 - 重试策略
Symfony messenger queues with binding key - retry strategy
我正在我工作的公司实现 Messenger。我发现路由密钥有问题。
我想将一条消息发送到两个队列。另外两个应用程序将处理这个队列。一切正常,但当处理程序抛出异常时我发现了问题。它将消息发送一个发送到两个重试队列,因为重试队列是通过绑定键匹配的,这对于这个队列是相同的。
最后重试 3 次,我的 dlqs 上有 16 条消息。你能帮我解决这个问题吗?是否可以基于队列而不是路由键创建重试策略?
我的配置如下:
messenger:
failure_transport: failed
default_bus: command.bus
transports:
async:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
max_delay: 0
exchange:
name: olimp
type: topic
queues:
create_miniature_v1:
binding_keys:
- first
create_miniature_v2:
binding_keys:
- first
failed:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
exchange:
name: olimp_dead
type: topic
queues:
create_miniature_v1_dlq:
binding_keys:
- first
create_miniature_v2_dlq:
binding_keys:
- first
routing:
'Olimp\Messenger\TestEvent': async
buses:
command.bus:
middleware:
- Olimp\Shared\Application\Message\Middleware\EventDispatcher
- doctrine_close_connection
- doctrine_transaction
event.bus:
default_middleware: allow_no_handlers
query.bus: ~
我用这样的戳记发送事件:
class MessengerTestCommand extends Command
{
protected static $defaultName = 'app:messenger-test';
private MessageBusInterface $bus;
public function __construct(MessageBusInterface $bus)
{
$this->bus = $bus;
parent::__construct();
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$this->bus->dispatch(
new TestEvent(), [
new AmqpStamp('first')
]
);
$io->success('Done');
return 0;
}
}
处理程序:
class TestEventHandler implements MessageHandlerInterface
{
public function __invoke(TestEvent $event)
{
dump($event->id);
throw new \Exception('Boom');
}
}
我在兔子身上发现了什么:
现在我正在尝试这样的配置:
framework:
messenger:
failure_transport: failed
default_bus: command.bus
transports:
async:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
max_delay: 0
exchange:
name: olimp
type: topic
queues:
create_miniature_v1:
binding_keys:
- first
async1:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
max_delay: 0
exchange:
name: olimp
type: topic
queues:
create_miniature_v2:
binding_keys:
- first
failed:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
exchange:
name: olimp_dead
type: topic
queues:
create_miniature_v1_dlq:
binding_keys:
- first
create_miniature_v2_dlq:
binding_keys:
- first
routing:
'Olimp\Messenger\TestEvent': [async, async1]
并使用两个 运行 控制台命令:
bin/console messenger:consume async
bin/console messenger:consume async1
但是效果一样。
好的,我自己找到了答案。
我创建了新的重试策略。我将 queue_name_pattern
更改为 %routing_key%_%delay%
并创建了我自己的 SendFailedMessageForRetryListener
。为了重试信封,我添加了 stamp new AmqpStamp($envelope->last(AmqpReceivedStamp::class)->getQueueName())
用于为延迟队列创建正确的路由密钥。因此,我没有根据交换名称创建队列,而是根据队列名称创建了队列。
还有两件事:
队列中的绑定键如下所示:
queues:
create_miniature_v1:
binding_keys:
- create_miniature_v1
- first
create_miniature_v2:
binding_keys:
- create_miniature_v2
- first
和失败的队列:
queues:
create_miniature_v1_dlq:
binding_keys:
- create_miniature_v1
create_miniature_v2_dlq:
binding_keys:
- create_miniature_v2
我正在我工作的公司实现 Messenger。我发现路由密钥有问题。
我想将一条消息发送到两个队列。另外两个应用程序将处理这个队列。一切正常,但当处理程序抛出异常时我发现了问题。它将消息发送一个发送到两个重试队列,因为重试队列是通过绑定键匹配的,这对于这个队列是相同的。
最后重试 3 次,我的 dlqs 上有 16 条消息。你能帮我解决这个问题吗?是否可以基于队列而不是路由键创建重试策略?
我的配置如下:
messenger:
failure_transport: failed
default_bus: command.bus
transports:
async:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
max_delay: 0
exchange:
name: olimp
type: topic
queues:
create_miniature_v1:
binding_keys:
- first
create_miniature_v2:
binding_keys:
- first
failed:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
exchange:
name: olimp_dead
type: topic
queues:
create_miniature_v1_dlq:
binding_keys:
- first
create_miniature_v2_dlq:
binding_keys:
- first
routing:
'Olimp\Messenger\TestEvent': async
buses:
command.bus:
middleware:
- Olimp\Shared\Application\Message\Middleware\EventDispatcher
- doctrine_close_connection
- doctrine_transaction
event.bus:
default_middleware: allow_no_handlers
query.bus: ~
我用这样的戳记发送事件:
class MessengerTestCommand extends Command
{
protected static $defaultName = 'app:messenger-test';
private MessageBusInterface $bus;
public function __construct(MessageBusInterface $bus)
{
$this->bus = $bus;
parent::__construct();
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$this->bus->dispatch(
new TestEvent(), [
new AmqpStamp('first')
]
);
$io->success('Done');
return 0;
}
}
处理程序:
class TestEventHandler implements MessageHandlerInterface
{
public function __invoke(TestEvent $event)
{
dump($event->id);
throw new \Exception('Boom');
}
}
我在兔子身上发现了什么:
现在我正在尝试这样的配置:
framework:
messenger:
failure_transport: failed
default_bus: command.bus
transports:
async:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
max_delay: 0
exchange:
name: olimp
type: topic
queues:
create_miniature_v1:
binding_keys:
- first
async1:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
max_delay: 0
exchange:
name: olimp
type: topic
queues:
create_miniature_v2:
binding_keys:
- first
failed:
dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
options:
exchange:
name: olimp_dead
type: topic
queues:
create_miniature_v1_dlq:
binding_keys:
- first
create_miniature_v2_dlq:
binding_keys:
- first
routing:
'Olimp\Messenger\TestEvent': [async, async1]
并使用两个 运行 控制台命令:
bin/console messenger:consume async
bin/console messenger:consume async1
但是效果一样。
好的,我自己找到了答案。
我创建了新的重试策略。我将 queue_name_pattern
更改为 %routing_key%_%delay%
并创建了我自己的 SendFailedMessageForRetryListener
。为了重试信封,我添加了 stamp new AmqpStamp($envelope->last(AmqpReceivedStamp::class)->getQueueName())
用于为延迟队列创建正确的路由密钥。因此,我没有根据交换名称创建队列,而是根据队列名称创建了队列。
还有两件事:
队列中的绑定键如下所示:
queues:
create_miniature_v1:
binding_keys:
- create_miniature_v1
- first
create_miniature_v2:
binding_keys:
- create_miniature_v2
- first
和失败的队列:
queues:
create_miniature_v1_dlq:
binding_keys:
- create_miniature_v1
create_miniature_v2_dlq:
binding_keys:
- create_miniature_v2