如何在独立代码中使用 Symfony Messenger 组件发送 AMQP 消息

How to use Symfony Messenger component in standalone code to send AMQP messages

我们在 Symfony 5 项目中使用 Symfony Messenger 来与 RabbitMQ 集成。在 Symfony 中发送消息时它工作正常,但我需要能够使用 Messenger 组件从一些不是使用 Symfony 框架构建的遗留 PHP 应用程序发送消息。

在 Symfony 下,它通过注入 MessageBusInterface 来处理所有的魔法,我需要做的就是像这样:

    public function processTestMessage(MessageBusInterface $bus)
    {
        $bus->dispatch(new TestMessage('Hello World!');
    }

我需要以某种方式实例化我自己的 $bus 版本,它将以与 Symfony 相同的方式发送 AMQP 消息。我一直在尝试重现 Symfony 在幕后为实现这一目标所做的一切,但未能将所有细节放在一起。

问题的症结在于创建我自己的 SendMessageMiddleware 来做与 Symfony 相同的事情。之后就简单了:

    $sendersLocator = ???
    $eventDispatcher = ???

    $sendMessageMiddleware = new($sendersLocator, $eventDispatcher);
    $bus = new MessageBus([$sendMessageMiddleware]);

有没有人有任何使用 Messenger 组件在 Symfony 之外发送 AMQP 消息的工作代码示例?

这可以改进,但对我有用:

use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender;
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;

$sendersLocator = new class implements SendersLocatorInterface {
    public function getSenders(Envelope $envelope): iterable
    {
        $connection = new Connection(
            [
                'hosts' => 'localhost',
                'port' => 5672,
                'vhosts' => '/',
                'login' => 'guest',
                'password' => 'guest'
            ],
            [
                'name' => 'messages'
            ],
            [
                'messages' => []
            ]
        );
        return [
            'async' => new AmqpSender($connection)
        ];
    }
};

$middleware = new SendMessageMiddleware($sendersLocator);

$bus = new MessageBus([$middleware]);

$bus->dispatch(new MyMessage());

我修改了上面的答案,让我将 RabbitMQ 凭据作为环境变量传递。这就是我的应用程序所需要的。我试图编写我自己的 DSN 解析器,发现 Symfony 已经这样做了,所以我基本上从那里提取了代码。

如果未设置环境变量,则默认使用与上述示例相同的设置。

use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender;
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;

$sendersLocator = new class implements SendersLocatorInterface {
    public function getSenders(Envelope $envelope): iterable
    {
        $dsn = getenv('MESSENGER_TRANSPORT_DSN');

        $connection = new Connection::fromDsn($dsn); 

        return [
            'async' => new AmqpSender($connection)
        ];
    }
};

$middleware = new SendMessageMiddleware($sendersLocator);

$bus = new MessageBus([$middleware]);

$bus->dispatch(new MyMessage());