为什么我在跳过的队列中收到消息

Why am I getting messages in the skipped queue

我在 fork/join 配置中设置了一个 saga。

传奇中定义的事件

当文件进入单独的侦听器时,进程开始。

下游端点对文件做一些工作

Consumer<FileSavedToMsg>

我在 saga_skipped 队列中收到一个 FileSavedToMsg。我只能假设这是由于 FileSavedToMsg 上有一个 correlationId,因为 saga 本身没有在其状态机中使用 FileSavedToMsg 并且没有 Event<FileSavedToMsg>.

如果这就是为什么...我应该在 CorrelationId 以外的字段中传递 correlationId,这样 saga 就看不到它了吗?我在某个地方需要它,所以我可以用它标记 SomeOtherMsg。

这里是 saga 端点的定义方式

return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
            {
                h.Username("guest");
                h.Password("guest");
            });              

            cfg.ReceiveEndpoint(host, "study_saga", epCfg =>
            {
                epCfg.StateMachineSaga(machine, repository);
            });
});

工作人员端点的定义方式如下

return Bus.Factory.CreateUsingRabbitMq(x =>
{
    var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        x.ReceiveEndpoint(host, "study_3d_volume_worker", c =>
        {
            c.PrefetchCount = 1;
            c.Instance(_studyCreatedMsgConsumer);
        });
 });

这些 运行 在同一台机器上,但在单独的 Console/Topshelf 应用程序中。

如果您在队列上获取的消息未被该接收端点上的消费者使用,则可能是您之前使用了该消息类型并将其从消费者(或 saga,在您的情况下)中删除) 或者您出于其他目的使用队列并且它消耗了该消息类型。

无论哪种方式,如果您进入 RabbitMQ 管理控制台并查找队列,您可以展开 Bindings V 形图标,单击以转到同名交换(这是标准的 MassTransit 惯例),然后展开交换的绑定,以查看哪些消息类型(命名为 .NET 类型名称的交换)绑定到该交换。

如果您看到一个没有被端点消耗的,那就是罪魁祸首。您可以使用 UI 取消绑定,之后发布的消息将不再发送到队列。