Symfony5 信使,相同消息处理程序的并行队列

Symfony5 messenger, parallel queues for same message handlers

Symfony 信使:

https://symfony.com/doc/current/messenger.html

问题:

Pool#1 =(user1 创建一个 JobJob 被拆分为 10 个信使的 Message
Pool#2 = (user2 创建一个 JobJob 被拆分为 10 个信使的 Message)
...
Pool#100 = (user100 创建一个 JobJob 被拆分为 10 个信使的 Message)

Pool#100 将不会执行,直到所有先前的 Pool 都未完成。

目标:

我需要并行队列,所有池将 运行 分开,因此每个池都有个人队列。

代码示例:

config/packages/messenger.yaml
framework:
    messenger:
        transports:
            sync: "%env(MESSENGER_TRANSPORT_DSN)%"
        routing:
            'App\Message\Job': sync
src/Message/Job.php
<?php

namespace App\Message;

class Job
{
    private $content;

    public function __construct(string $content)
    {
        $this->content = $content;
    }

    public function getContent(): string
    {
        return $this->content;
    }
}
src/MessageHandler/JobHandler.php
<?php

namespace App\MessageHandler;

use App\Message\Job;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class JobHandler implements MessageHandlerInterface
{
    public function __construct()
    {}

    public function __invoke(Job $message)
    {
        $params = json_decode($message->getContent(), true);
        dump($params);
    }
}
src/Controller/JobController.php
<?php

namespace App\Controller;

use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;

/**
 * @Route("/job")
 */
class JobController extends AbstractController
{
    /**
     * @Route("/create", name="app_job_create")
     * @param Request $request
     * @param MessageBusInterface $bus
     * @return JsonResponse
     */
    public function create(Request $request, MessageBusInterface $bus): JsonResponse
    {
        // ...
        $entityId = $entity->getId();
        // ...

        for ($i = 0; $i < 10; $i++) {
            $params['entityId'] = $entityId;
            $params['counter'] = $i;
            $bus->dispatch(new Job(json_encode($params)));
        }

        return new JsonResponse([]);
    }
}

更多信息:

我想继续使用它,但找不到最简单的解决方案来传递一些唯一的队列名称或 ID,然后告诉工作人员他必须只处理这个 Messages 的池。
我找到了自定义传输 https://symfony.com/doc/current/messenger/custom-transport.html,但我不确定它是否有帮助。至少我认为只有自定义传输是不够的。
我读到了 Actor models https://www.brianstorti.com/the-actor-model/ 但如果可能的话,我只想使用 Messenger+Redis。

可能这里没有解决方案,而且这个 Messenger 还不能处理并行队列。不管怎样,我很高兴能得到任何帮助。
谢谢!

我最终使用动态队列名称解决了这个问题。
不幸的是,我被迫拒绝了信使。

此外,symfony 5(2020 年 6 月)目前不支持 RabbitMQ https://github.com/php-amqplib/RabbitMqBundle 的 SDK,我不确定 100%,但我在第 3 个版本上使用它,但我不能'不要把它放在 5 号。所以我又用了一个。

这是一个很好的快速入门指南https://blog.programster.org/rabbitmq-job-queue-with-php
在示例中,我将 RABBITMQ_QUEUE_NAME 更改为动态名称并且它工作正常。

然后一切如常,启动RabbitMQ并配置主管https://symfony.com/doc/current/messenger.html#supervisor-configuration

如果这对某人有帮助,我将很高兴,谢谢!