使用 saga 事件对消费者中发布的消息做出反应

Using saga event to react to a a message published in a consumer

我正在使用带有 RabbitMq 和 Automatonymous 的 Mass Transit 进行概念验证 asp.net 核心 2.1 应用程序。我在 Postgres 中使用 EntityFramework 核心 坚持。

我想做的是在 一旦传奇完成,就会向 http rest api 和 return 请求结果。 我正在做的是:

这是我的代码:

我的界面

public interface IStartSagaRequest
{
    Guid CorrelationId { get; set; }
    string Name {get; set;}
}

public interface IStartSagaResponse
{
    Guid CorrelationId { get; set; }
    bool DidComplete {get; set;}
}

public IDoOperationRequest
{
    Guid CorrelationId { get; set; }
}

public IOperationComplete
{
    Guid CorrelationId { get; set; }
    bool OperationSuccessful {get; set;}
}

我的 saga 实例

public class DoOperationSaga : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public Name { get; set; }
    public string CurrentState { get; set; }
}

用于在状态机中发布的 IDoOperationRequest 的具体实现

public class DoOperationRequestImpl : IDoOperationRequest
{
    public Guid CorrelationId { get; set; }
}

用于在状态机中发布的 IStartSagaResponse 的具体实现

public class StartSagaResponse : IStartSagaResponse
{
    public Guid CorrelationId { get; set; }
    public bool DidComplete {get; set;}
}

我的状态机

public class ProcessOperationStateMachine : MassTransitStateMachine<DoOperationSaga>
{
    public State OperationPending { get; private set; }
    public State Complete { get; private set; }


    public Event<IOperationComplete> OperationCompleteEvent { get; private set; }
    public Event<IStartSagaRequest> StartSagaRequestEvent { get; private set; }


    public ProcessOperationStateMachine()
    {
        InstanceState(doOperationSagaInstance => doOperationSagaInstance.CurrentState);

        Event(() => StartSagaRequestEvent, eventConfigurator =>
        {
            eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
                    context => context.Message.CorrelationId).SelectId(c => Guid.NewGuid());
        });

        Event(() => OperationCompleteEvent, eventConfigurator =>
        {
            eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
                context => context.Message.CorrelationId);
        });


        Initially(
            When(StartSagaRequestEvent)
                .Then(context =>
                {
                    context.Instance.CorrelationId = context.Data.CorrelationId;
                    context.Instance.Name = context.Data.Name;
                    context.Publish(new DoOperationRequestImpl
                    {
                        CorrelationId = context.Data.CorrelationId
                    });

                })
                .TransitionTo(OperationPending)
        );

        During(OperationPending,
            When(OperationCompleteEvent)
                .Then(context =>
                {
                    // I'm just doing this for debugging
                    context.Instance.Name = "changed in operationComplete";
                })
                .ThenAsync(context => context.RespondAsync(new StartSagaResponse 
                { 
                    CorrelationId = context.Data.CorrelationId,
                    DidComplete = true
                }))
                .Finalize());

}

我的消费者:

public class DoOperationRequestConsumer : IConsumer<ISBDoOperationRequest>
{

    public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
    {
       await context.Publish<IOperationComplete>(new
       {
          CorrelationId = context.Message.CorrelationId,
          OperationSuccessful = true
       });
    }
}

我是如何在 Startup.cs

中连接 DI 的
public void ConfigureServices(IServiceCollection services)
{
    stateMachine = new ProcessOperationStateMachine();

    SagaDbContextFactory factory = new SagaDbContextFactory();
    EntityFrameworkSagaRepository<DoOperationSaga> repository = new EntityFrameworkSagaRepository<DoOperationSaga>(factory);

    services.AddMassTransit(x =>
    {

        x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(sbc =>
        {
            IRabbitMqHost host = sbc.Host(new Uri("rabbitmq://localhost/"), h =>
            {
                h.Username("guest");
                h.Password("guest");
            });

            sbc.ReceiveEndpoint(host, "do-operation", ep =>
            {
                ep.UseMessageRetry(c => c.Interval(2, 100));
                ep.StateMachineSaga(stateMachine, repository);
                ep.Durable = false;
            });

            sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
            {
                ep.Consumer(() => new DoOperationRequestConsumer());
                ep.Durable = false;
            });
        }));
        x.AddConsumer<DoOperationRequestConsumer>();
    });

    services.AddScoped<DoOperationRequestConsumer>();

    services.AddScoped(p =>
        p.GetRequiredService<IBusControl>()
            .CreateRequestClient<IDoOperationRequest, IDoOperationResponse>(
                new Uri("rabbitmq://localhost/do-operation?durable=false"),
                TimeSpan.FromSeconds(30)));

}

并在我的控制器中发出请求:

public IRequestClient<IDoOperationRequest, IDoOperationResponse> _doOperationClient { get; set; }
...
var guid = Guid.NewGuid();
_doOperationClient.Request(new
{
    Name = "from the controller",
    CorrelationId = guid
});

我看到的是我的状态机确实启动了。当(StartSagaRequestEvent)确实被击中时 并且发布了 DoOperationRequest 消息。 DoOperationRequestConsumer 确实收到消息 并发布 IOperationComplete 消息。然而,这就是它停止的地方。我的 IOperationCompleteEvent 在我的状态机中没有被调用。当我查看数据库时,我可以看到我的传奇实例得到 使用 guid 创建,CurrentState 设置为 OperationPending。当我查看我的 rabbitmq 管理仪表板我看到一条消息在我的 DoOperationRequestConsumer 完成后发布 IOperationComplete 消息发布。我只是没有看到状态机使用 IOperationComplete 消费者发布的消息。当我设置断点并检查消费者中的消息时 我确实看到 CorrelationId 设置为与传奇的 CorrelationId 相同的值。

我还尝试在 消费者:

public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
{
    ISendEndpoint sendEndpoint = await context.GetSendEndpoint(new Uri("rabbitmq://localhost/do-operation?durable=false"));

    await sendEndpoint.Send<IOperationComplete>(new
    {
      CorrelationId = context.Message.CorrelationId,
      OperationSuccessful = true
    });
}

但仍然无法建立连接。

我整天都在为这个问题苦苦思索,但我不确定是什么 我在这里失踪了。如果有人可以就我可能做错的事情给我一些建议,我将不胜感激 它,再次为文字墙感到抱歉,我知道它被分配阅读但我想清楚我在做什么。 非常感谢!

你的事件correlationId好像有点可疑,应该是这样的:

Event(() => StartSagaRequestEvent, eventConfigurator =>
{
    eventConfigurator.CorrelateById(context => context.Message.CorrelationId)
        .SelectId(context => context.Message.CorrelationId);
});

这样它就初始化为消息的 CorrelationId。

无关,但您的端点应使用容器的扩展方法:

sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
{
    ep.ConfigureConsumer<DoOperationRequestConsumer>();
    ep.Durable = false;
});

并使用新的请求客户端,也可以在扩展中对其进行配置。

x.AddRequestClient<IDoOperationRequest>(new Uri("rabbitmq://localhost/do-operation?durable=false"));

此外,在您的初始条件下,应删除此行:

context.Instance.CorrelationId = context.Data.CorrelationId;