Symfony5 信使,相同消息处理程序的并行队列
Symfony5 messenger, parallel queues for same message handlers
Symfony 信使:
https://symfony.com/doc/current/messenger.html
问题:
Pool#1 =(user1
创建一个 Job
,Job
被拆分为 10 个信使的 Message
)
Pool#2 = (user2
创建一个 Job
,Job
被拆分为 10 个信使的 Message
)
...
Pool#100 = (user100
创建一个 Job
,Job
被拆分为 10 个信使的 Message
)
Pool#100 将不会执行,直到所有先前的 Pool 都未完成。
目标:
我需要并行队列,所有池将 运行 分开,因此每个池都有个人队列。
代码示例:
config/packages/messenger.yaml
framework:
messenger:
transports:
sync: "%env(MESSENGER_TRANSPORT_DSN)%"
routing:
'App\Message\Job': sync
src/Message/Job.php
<?php
namespace App\Message;
class Job
{
private $content;
public function __construct(string $content)
{
$this->content = $content;
}
public function getContent(): string
{
return $this->content;
}
}
src/MessageHandler/JobHandler.php
<?php
namespace App\MessageHandler;
use App\Message\Job;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class JobHandler implements MessageHandlerInterface
{
public function __construct()
{}
public function __invoke(Job $message)
{
$params = json_decode($message->getContent(), true);
dump($params);
}
}
src/Controller/JobController.php
<?php
namespace App\Controller;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;
/**
* @Route("/job")
*/
class JobController extends AbstractController
{
/**
* @Route("/create", name="app_job_create")
* @param Request $request
* @param MessageBusInterface $bus
* @return JsonResponse
*/
public function create(Request $request, MessageBusInterface $bus): JsonResponse
{
// ...
$entityId = $entity->getId();
// ...
for ($i = 0; $i < 10; $i++) {
$params['entityId'] = $entityId;
$params['counter'] = $i;
$bus->dispatch(new Job(json_encode($params)));
}
return new JsonResponse([]);
}
}
更多信息:
我想继续使用它,但找不到最简单的解决方案来传递一些唯一的队列名称或 ID,然后告诉工作人员他必须只处理这个 Messages
的池。
我找到了自定义传输 https://symfony.com/doc/current/messenger/custom-transport.html,但我不确定它是否有帮助。至少我认为只有自定义传输是不够的。
我读到了 Actor models
https://www.brianstorti.com/the-actor-model/ 但如果可能的话,我只想使用 Messenger+Redis。
可能这里没有解决方案,而且这个 Messenger 还不能处理并行队列。不管怎样,我很高兴能得到任何帮助。
谢谢!
我最终使用动态队列名称解决了这个问题。
不幸的是,我被迫拒绝了信使。
此外,symfony 5(2020 年 6 月)目前不支持 RabbitMQ https://github.com/php-amqplib/RabbitMqBundle 的 SDK,我不确定 100%,但我在第 3 个版本上使用它,但我不能'不要把它放在 5 号。所以我又用了一个。
这是一个很好的快速入门指南https://blog.programster.org/rabbitmq-job-queue-with-php
在示例中,我将 RABBITMQ_QUEUE_NAME 更改为动态名称并且它工作正常。
然后一切如常,启动RabbitMQ并配置主管https://symfony.com/doc/current/messenger.html#supervisor-configuration
如果这对某人有帮助,我将很高兴,谢谢!
Symfony 信使:
https://symfony.com/doc/current/messenger.html
问题:
Pool#1 =(user1
创建一个 Job
,Job
被拆分为 10 个信使的 Message
)
Pool#2 = (user2
创建一个 Job
,Job
被拆分为 10 个信使的 Message
)
...
Pool#100 = (user100
创建一个 Job
,Job
被拆分为 10 个信使的 Message
)
Pool#100 将不会执行,直到所有先前的 Pool 都未完成。
目标:
我需要并行队列,所有池将 运行 分开,因此每个池都有个人队列。
代码示例:
config/packages/messenger.yamlframework:
messenger:
transports:
sync: "%env(MESSENGER_TRANSPORT_DSN)%"
routing:
'App\Message\Job': sync
src/Message/Job.php
<?php
namespace App\Message;
class Job
{
private $content;
public function __construct(string $content)
{
$this->content = $content;
}
public function getContent(): string
{
return $this->content;
}
}
src/MessageHandler/JobHandler.php
<?php
namespace App\MessageHandler;
use App\Message\Job;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class JobHandler implements MessageHandlerInterface
{
public function __construct()
{}
public function __invoke(Job $message)
{
$params = json_decode($message->getContent(), true);
dump($params);
}
}
src/Controller/JobController.php
<?php
namespace App\Controller;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\Annotation\Route;
/**
* @Route("/job")
*/
class JobController extends AbstractController
{
/**
* @Route("/create", name="app_job_create")
* @param Request $request
* @param MessageBusInterface $bus
* @return JsonResponse
*/
public function create(Request $request, MessageBusInterface $bus): JsonResponse
{
// ...
$entityId = $entity->getId();
// ...
for ($i = 0; $i < 10; $i++) {
$params['entityId'] = $entityId;
$params['counter'] = $i;
$bus->dispatch(new Job(json_encode($params)));
}
return new JsonResponse([]);
}
}
更多信息:
我想继续使用它,但找不到最简单的解决方案来传递一些唯一的队列名称或 ID,然后告诉工作人员他必须只处理这个 Messages
的池。
我找到了自定义传输 https://symfony.com/doc/current/messenger/custom-transport.html,但我不确定它是否有帮助。至少我认为只有自定义传输是不够的。
我读到了 Actor models
https://www.brianstorti.com/the-actor-model/ 但如果可能的话,我只想使用 Messenger+Redis。
可能这里没有解决方案,而且这个 Messenger 还不能处理并行队列。不管怎样,我很高兴能得到任何帮助。
谢谢!
我最终使用动态队列名称解决了这个问题。
不幸的是,我被迫拒绝了信使。
此外,symfony 5(2020 年 6 月)目前不支持 RabbitMQ https://github.com/php-amqplib/RabbitMqBundle 的 SDK,我不确定 100%,但我在第 3 个版本上使用它,但我不能'不要把它放在 5 号。所以我又用了一个。
这是一个很好的快速入门指南https://blog.programster.org/rabbitmq-job-queue-with-php
在示例中,我将 RABBITMQ_QUEUE_NAME 更改为动态名称并且它工作正常。
然后一切如常,启动RabbitMQ并配置主管https://symfony.com/doc/current/messenger.html#supervisor-configuration
如果这对某人有帮助,我将很高兴,谢谢!