使用 saga 事件对消费者中发布的消息做出反应
Using saga event to react to a a message published in a consumer
我正在使用带有 RabbitMq 和 Automatonymous 的 Mass Transit 进行概念验证
asp.net 核心 2.1 应用程序。我在 Postgres 中使用 EntityFramework 核心
坚持。
我想做的是在
一旦传奇完成,就会向 http rest api 和 return 请求结果。
我正在做的是:
- 使用具有 request/response 客户端
的界面连接一个事件以开始我的传奇
- 在 saga 中发布一条由消费者使用的消息
- 在消费者中发布一条消息,该消息对应于我的传奇中的另一个事件
- return 我的 saga 完成和定稿时的回应
这是我的代码:
我的界面
public interface IStartSagaRequest
{
Guid CorrelationId { get; set; }
string Name {get; set;}
}
public interface IStartSagaResponse
{
Guid CorrelationId { get; set; }
bool DidComplete {get; set;}
}
public IDoOperationRequest
{
Guid CorrelationId { get; set; }
}
public IOperationComplete
{
Guid CorrelationId { get; set; }
bool OperationSuccessful {get; set;}
}
我的 saga 实例
public class DoOperationSaga : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public Name { get; set; }
public string CurrentState { get; set; }
}
用于在状态机中发布的 IDoOperationRequest 的具体实现
public class DoOperationRequestImpl : IDoOperationRequest
{
public Guid CorrelationId { get; set; }
}
用于在状态机中发布的 IStartSagaResponse 的具体实现
public class StartSagaResponse : IStartSagaResponse
{
public Guid CorrelationId { get; set; }
public bool DidComplete {get; set;}
}
我的状态机
public class ProcessOperationStateMachine : MassTransitStateMachine<DoOperationSaga>
{
public State OperationPending { get; private set; }
public State Complete { get; private set; }
public Event<IOperationComplete> OperationCompleteEvent { get; private set; }
public Event<IStartSagaRequest> StartSagaRequestEvent { get; private set; }
public ProcessOperationStateMachine()
{
InstanceState(doOperationSagaInstance => doOperationSagaInstance.CurrentState);
Event(() => StartSagaRequestEvent, eventConfigurator =>
{
eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
context => context.Message.CorrelationId).SelectId(c => Guid.NewGuid());
});
Event(() => OperationCompleteEvent, eventConfigurator =>
{
eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
context => context.Message.CorrelationId);
});
Initially(
When(StartSagaRequestEvent)
.Then(context =>
{
context.Instance.CorrelationId = context.Data.CorrelationId;
context.Instance.Name = context.Data.Name;
context.Publish(new DoOperationRequestImpl
{
CorrelationId = context.Data.CorrelationId
});
})
.TransitionTo(OperationPending)
);
During(OperationPending,
When(OperationCompleteEvent)
.Then(context =>
{
// I'm just doing this for debugging
context.Instance.Name = "changed in operationComplete";
})
.ThenAsync(context => context.RespondAsync(new StartSagaResponse
{
CorrelationId = context.Data.CorrelationId,
DidComplete = true
}))
.Finalize());
}
我的消费者:
public class DoOperationRequestConsumer : IConsumer<ISBDoOperationRequest>
{
public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
{
await context.Publish<IOperationComplete>(new
{
CorrelationId = context.Message.CorrelationId,
OperationSuccessful = true
});
}
}
我是如何在 Startup.cs
中连接 DI 的
public void ConfigureServices(IServiceCollection services)
{
stateMachine = new ProcessOperationStateMachine();
SagaDbContextFactory factory = new SagaDbContextFactory();
EntityFrameworkSagaRepository<DoOperationSaga> repository = new EntityFrameworkSagaRepository<DoOperationSaga>(factory);
services.AddMassTransit(x =>
{
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(sbc =>
{
IRabbitMqHost host = sbc.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
sbc.ReceiveEndpoint(host, "do-operation", ep =>
{
ep.UseMessageRetry(c => c.Interval(2, 100));
ep.StateMachineSaga(stateMachine, repository);
ep.Durable = false;
});
sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
{
ep.Consumer(() => new DoOperationRequestConsumer());
ep.Durable = false;
});
}));
x.AddConsumer<DoOperationRequestConsumer>();
});
services.AddScoped<DoOperationRequestConsumer>();
services.AddScoped(p =>
p.GetRequiredService<IBusControl>()
.CreateRequestClient<IDoOperationRequest, IDoOperationResponse>(
new Uri("rabbitmq://localhost/do-operation?durable=false"),
TimeSpan.FromSeconds(30)));
}
并在我的控制器中发出请求:
public IRequestClient<IDoOperationRequest, IDoOperationResponse> _doOperationClient { get; set; }
...
var guid = Guid.NewGuid();
_doOperationClient.Request(new
{
Name = "from the controller",
CorrelationId = guid
});
我看到的是我的状态机确实启动了。当(StartSagaRequestEvent)确实被击中时
并且发布了 DoOperationRequest 消息。 DoOperationRequestConsumer 确实收到消息
并发布 IOperationComplete 消息。然而,这就是它停止的地方。我的 IOperationCompleteEvent
在我的状态机中没有被调用。当我查看数据库时,我可以看到我的传奇实例得到
使用 guid 创建,CurrentState 设置为 OperationPending。当我查看我的 rabbitmq
管理仪表板我看到一条消息在我的 DoOperationRequestConsumer 完成后发布
IOperationComplete 消息发布。我只是没有看到状态机使用 IOperationComplete
消费者发布的消息。当我设置断点并检查消费者中的消息时
我确实看到 CorrelationId 设置为与传奇的 CorrelationId 相同的值。
我还尝试在
消费者:
public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
{
ISendEndpoint sendEndpoint = await context.GetSendEndpoint(new Uri("rabbitmq://localhost/do-operation?durable=false"));
await sendEndpoint.Send<IOperationComplete>(new
{
CorrelationId = context.Message.CorrelationId,
OperationSuccessful = true
});
}
但仍然无法建立连接。
我整天都在为这个问题苦苦思索,但我不确定是什么
我在这里失踪了。如果有人可以就我可能做错的事情给我一些建议,我将不胜感激
它,再次为文字墙感到抱歉,我知道它被分配阅读但我想清楚我在做什么。
非常感谢!
你的事件correlationId好像有点可疑,应该是这样的:
Event(() => StartSagaRequestEvent, eventConfigurator =>
{
eventConfigurator.CorrelateById(context => context.Message.CorrelationId)
.SelectId(context => context.Message.CorrelationId);
});
这样它就初始化为消息的 CorrelationId。
无关,但您的端点应使用容器的扩展方法:
sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
{
ep.ConfigureConsumer<DoOperationRequestConsumer>();
ep.Durable = false;
});
并使用新的请求客户端,也可以在扩展中对其进行配置。
x.AddRequestClient<IDoOperationRequest>(new Uri("rabbitmq://localhost/do-operation?durable=false"));
此外,在您的初始条件下,应删除此行:
context.Instance.CorrelationId = context.Data.CorrelationId;
我正在使用带有 RabbitMq 和 Automatonymous 的 Mass Transit 进行概念验证 asp.net 核心 2.1 应用程序。我在 Postgres 中使用 EntityFramework 核心 坚持。
我想做的是在 一旦传奇完成,就会向 http rest api 和 return 请求结果。 我正在做的是:
- 使用具有 request/response 客户端 的界面连接一个事件以开始我的传奇
- 在 saga 中发布一条由消费者使用的消息
- 在消费者中发布一条消息,该消息对应于我的传奇中的另一个事件
- return 我的 saga 完成和定稿时的回应
这是我的代码:
我的界面
public interface IStartSagaRequest
{
Guid CorrelationId { get; set; }
string Name {get; set;}
}
public interface IStartSagaResponse
{
Guid CorrelationId { get; set; }
bool DidComplete {get; set;}
}
public IDoOperationRequest
{
Guid CorrelationId { get; set; }
}
public IOperationComplete
{
Guid CorrelationId { get; set; }
bool OperationSuccessful {get; set;}
}
我的 saga 实例
public class DoOperationSaga : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public Name { get; set; }
public string CurrentState { get; set; }
}
用于在状态机中发布的 IDoOperationRequest 的具体实现
public class DoOperationRequestImpl : IDoOperationRequest
{
public Guid CorrelationId { get; set; }
}
用于在状态机中发布的 IStartSagaResponse 的具体实现
public class StartSagaResponse : IStartSagaResponse
{
public Guid CorrelationId { get; set; }
public bool DidComplete {get; set;}
}
我的状态机
public class ProcessOperationStateMachine : MassTransitStateMachine<DoOperationSaga>
{
public State OperationPending { get; private set; }
public State Complete { get; private set; }
public Event<IOperationComplete> OperationCompleteEvent { get; private set; }
public Event<IStartSagaRequest> StartSagaRequestEvent { get; private set; }
public ProcessOperationStateMachine()
{
InstanceState(doOperationSagaInstance => doOperationSagaInstance.CurrentState);
Event(() => StartSagaRequestEvent, eventConfigurator =>
{
eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
context => context.Message.CorrelationId).SelectId(c => Guid.NewGuid());
});
Event(() => OperationCompleteEvent, eventConfigurator =>
{
eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
context => context.Message.CorrelationId);
});
Initially(
When(StartSagaRequestEvent)
.Then(context =>
{
context.Instance.CorrelationId = context.Data.CorrelationId;
context.Instance.Name = context.Data.Name;
context.Publish(new DoOperationRequestImpl
{
CorrelationId = context.Data.CorrelationId
});
})
.TransitionTo(OperationPending)
);
During(OperationPending,
When(OperationCompleteEvent)
.Then(context =>
{
// I'm just doing this for debugging
context.Instance.Name = "changed in operationComplete";
})
.ThenAsync(context => context.RespondAsync(new StartSagaResponse
{
CorrelationId = context.Data.CorrelationId,
DidComplete = true
}))
.Finalize());
}
我的消费者:
public class DoOperationRequestConsumer : IConsumer<ISBDoOperationRequest>
{
public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
{
await context.Publish<IOperationComplete>(new
{
CorrelationId = context.Message.CorrelationId,
OperationSuccessful = true
});
}
}
我是如何在 Startup.cs
中连接 DI 的public void ConfigureServices(IServiceCollection services)
{
stateMachine = new ProcessOperationStateMachine();
SagaDbContextFactory factory = new SagaDbContextFactory();
EntityFrameworkSagaRepository<DoOperationSaga> repository = new EntityFrameworkSagaRepository<DoOperationSaga>(factory);
services.AddMassTransit(x =>
{
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(sbc =>
{
IRabbitMqHost host = sbc.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
sbc.ReceiveEndpoint(host, "do-operation", ep =>
{
ep.UseMessageRetry(c => c.Interval(2, 100));
ep.StateMachineSaga(stateMachine, repository);
ep.Durable = false;
});
sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
{
ep.Consumer(() => new DoOperationRequestConsumer());
ep.Durable = false;
});
}));
x.AddConsumer<DoOperationRequestConsumer>();
});
services.AddScoped<DoOperationRequestConsumer>();
services.AddScoped(p =>
p.GetRequiredService<IBusControl>()
.CreateRequestClient<IDoOperationRequest, IDoOperationResponse>(
new Uri("rabbitmq://localhost/do-operation?durable=false"),
TimeSpan.FromSeconds(30)));
}
并在我的控制器中发出请求:
public IRequestClient<IDoOperationRequest, IDoOperationResponse> _doOperationClient { get; set; }
...
var guid = Guid.NewGuid();
_doOperationClient.Request(new
{
Name = "from the controller",
CorrelationId = guid
});
我看到的是我的状态机确实启动了。当(StartSagaRequestEvent)确实被击中时 并且发布了 DoOperationRequest 消息。 DoOperationRequestConsumer 确实收到消息 并发布 IOperationComplete 消息。然而,这就是它停止的地方。我的 IOperationCompleteEvent 在我的状态机中没有被调用。当我查看数据库时,我可以看到我的传奇实例得到 使用 guid 创建,CurrentState 设置为 OperationPending。当我查看我的 rabbitmq 管理仪表板我看到一条消息在我的 DoOperationRequestConsumer 完成后发布 IOperationComplete 消息发布。我只是没有看到状态机使用 IOperationComplete 消费者发布的消息。当我设置断点并检查消费者中的消息时 我确实看到 CorrelationId 设置为与传奇的 CorrelationId 相同的值。
我还尝试在 消费者:
public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
{
ISendEndpoint sendEndpoint = await context.GetSendEndpoint(new Uri("rabbitmq://localhost/do-operation?durable=false"));
await sendEndpoint.Send<IOperationComplete>(new
{
CorrelationId = context.Message.CorrelationId,
OperationSuccessful = true
});
}
但仍然无法建立连接。
我整天都在为这个问题苦苦思索,但我不确定是什么 我在这里失踪了。如果有人可以就我可能做错的事情给我一些建议,我将不胜感激 它,再次为文字墙感到抱歉,我知道它被分配阅读但我想清楚我在做什么。 非常感谢!
你的事件correlationId好像有点可疑,应该是这样的:
Event(() => StartSagaRequestEvent, eventConfigurator =>
{
eventConfigurator.CorrelateById(context => context.Message.CorrelationId)
.SelectId(context => context.Message.CorrelationId);
});
这样它就初始化为消息的 CorrelationId。
无关,但您的端点应使用容器的扩展方法:
sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
{
ep.ConfigureConsumer<DoOperationRequestConsumer>();
ep.Durable = false;
});
并使用新的请求客户端,也可以在扩展中对其进行配置。
x.AddRequestClient<IDoOperationRequest>(new Uri("rabbitmq://localhost/do-operation?durable=false"));
此外,在您的初始条件下,应删除此行:
context.Instance.CorrelationId = context.Data.CorrelationId;