Rebus Saga:当订阅者发送给发布者时,消息未按正确的顺序到达
Rebus Saga: Message is not arrived in correct sequence when subscriber(s) sent to publisher
最近,我们将 rebus 版本从 1 升级到 5,然后我们遇到了 Saga 处理程序的问题。现在我们没有从不同的订阅者那里得到正确顺序的响应。
我们有不同的来源来验证请求,为此我们有协调器来处理来自不同验证源的所有响应,但现在的问题是:所有 "SearchStarted" 消息并非来自所有验证的协调器首先到达源,因此我们无法检查有多少认证源开始认证。
尝试以不同的方式发送消息,例如
1. 使用 SEND 方法代替 REPLY。
2. 在发送响应之前尝试使用我们的 without await 关键字。
3. 尝试使用 Send/Reply 方法的 .Wait() 方法。
身份验证协调器:
public class AuthenticationSaga : Saga<AuthenticationSagaData>, IAmInitiatedBy<AuthenticationRequest>, IHandleMessages<SearchStarted>, IHandleMessages<SearchCompleted>, IHandleMessages<AuthenticationResponse>
{
private readonly IBus _bus;
public IBus Bus
{
get { return _bus; }
}
public AuthenticationSaga(IBus bus)
{
_bus = bus;
}
public async Task Handle(AuthenticationRequest message)
{
if (!IsNew) return;
Data.Id = new Guid(MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId]);
Data.ReturnAddress = MessageContext.Current.Headers[Rebus.Messages.Headers.ReturnAddress];
message.UniqueId = Data.Id.ToString();
Data.RequestMessage = message;
Bus.Publish(message);
}
public async Task Handle(SearchStarted message)
{
}
public async Task Handle(SearchCompleted message)
{
}
public async Task Handle(AuthenticationResponse message)
{
}
protected override void CorrelateMessages(ICorrelationConfig<AuthenticationSagaData> config)
{
config.Correlate<AuthenticationRequest>(m => m.UniqueId, d => d.Id);
config.Correlate<SearchStarted>(m => m.UniqueId, d => d.Id);
config.Correlate<AuthenticationResponse>(m => m.UniqueId, d => d.Id);
config.Correlate<SearchCompleted>(m => m.UniqueId, d => d.Id);
}
}
AuthenticationLdap:
public class AuthenticationLdapHandler : IHandleMessages
{
private readonly IBus _bus;
public IBus bus
{
get { return _bus; }
}
public AuthenticationLdapHandler(IBus bus)
{
_bus = bus;
}
public async Task Handle(AuthenticationRequest message)
{
await bus.Reply(new SearchStarted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });
var response = AuthenticateLdap(message); await bus.Reply(response);
await bus.Reply(new SearchCompleted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });
}
}
AuthenticationNative:
public class AuthenticationNativeHandler : IHandleMessages
{
private readonly IBus _bus;
public IBus bus
{
get { return _bus; }
}
public AuthenticationNativeHandler(IBus bus)
{
_bus = bus;
}
public async Task Handle(AuthenticationRequest message)
{
await bus.Reply(new SearchStarted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });
var response = AuthenticateNative(message); await bus.Reply(response);
await bus.Reply(new SearchCompleted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });
}
}
我们期待 AuthenticationCoordinator 中的响应如下:
来自 Ldap 的 SearchStarted 消息
来自本机的 SearchStarted 消息
- 来自 Ldap 的 AuthenticationResponse 消息
- 来自 Ldap 的 SearchCompleted 消息
- 来自本机的 AuthenticationResponse 消息
- 来自本机的 SearchCompleted 消息
但现在我们收到的响应顺序如下:
- 来自 Ldap 的 SearchStarted 消息
- 来自 Ldap 的 AuthenticationResponse 消息
- 来自 Ldap 的 SearchCompleted 消息
- 来自本机的 SearchStarted 消息
- 来自本机的 AuthenticationResponse 消息
- 来自本机的 SearchCompleted 消息
我们可以设置消息的优先级吗?我们如何才能在 rebus 5 中获得高于预期的响应。
您所看到的很可能是 Rebus 确保在您的处理程序完成执行后发送所有传出消息这一事实的结果。
它通过在其事务上下文中征集所有总线操作来做到这一点,只有在您的处理程序代码完成后才会提交。
这意味着代码像
public async Task Handle(string message)
{
await bus.Reply("this is message 1");
await Task.Delay(TimeSpan.FromSeconds(1));
await bus.Reply("this is message 2");
await Task.Delay(TimeSpan.FromSeconds(1));
await bus.Reply("this is message 3");
}
当提交 Rebus 事务上下文时,将导致同时发送消息 1、2 和 3,这意味着接收方将以随机顺序接收它们。
如果您要立即从您的处理程序发送消息,您可以 "dismantle" 像这样的事务上下文:
var transactionContext = AmbientTransactionContext.Current;
AmbientTransactionContext.SetCurrent(null);
try
{
// current transaction will never know....
await bus.Send(whee);
}
finally
{
AmbientTransactionContext.SetCurrent(transactionContext);
}
我建议您将其包含在 IDisposable
的实现中,这样可以像这样使用:
using(new RebusTransactionContextDismantler())
{
// current transaction will never know....
await bus.Publish(whee);
}
最近,我们将 rebus 版本从 1 升级到 5,然后我们遇到了 Saga 处理程序的问题。现在我们没有从不同的订阅者那里得到正确顺序的响应。
我们有不同的来源来验证请求,为此我们有协调器来处理来自不同验证源的所有响应,但现在的问题是:所有 "SearchStarted" 消息并非来自所有验证的协调器首先到达源,因此我们无法检查有多少认证源开始认证。
尝试以不同的方式发送消息,例如 1. 使用 SEND 方法代替 REPLY。 2. 在发送响应之前尝试使用我们的 without await 关键字。 3. 尝试使用 Send/Reply 方法的 .Wait() 方法。
身份验证协调器:
public class AuthenticationSaga : Saga<AuthenticationSagaData>, IAmInitiatedBy<AuthenticationRequest>, IHandleMessages<SearchStarted>, IHandleMessages<SearchCompleted>, IHandleMessages<AuthenticationResponse>
{
private readonly IBus _bus;
public IBus Bus
{
get { return _bus; }
}
public AuthenticationSaga(IBus bus)
{
_bus = bus;
}
public async Task Handle(AuthenticationRequest message)
{
if (!IsNew) return;
Data.Id = new Guid(MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId]);
Data.ReturnAddress = MessageContext.Current.Headers[Rebus.Messages.Headers.ReturnAddress];
message.UniqueId = Data.Id.ToString();
Data.RequestMessage = message;
Bus.Publish(message);
}
public async Task Handle(SearchStarted message)
{
}
public async Task Handle(SearchCompleted message)
{
}
public async Task Handle(AuthenticationResponse message)
{
}
protected override void CorrelateMessages(ICorrelationConfig<AuthenticationSagaData> config)
{
config.Correlate<AuthenticationRequest>(m => m.UniqueId, d => d.Id);
config.Correlate<SearchStarted>(m => m.UniqueId, d => d.Id);
config.Correlate<AuthenticationResponse>(m => m.UniqueId, d => d.Id);
config.Correlate<SearchCompleted>(m => m.UniqueId, d => d.Id);
}
}
AuthenticationLdap:
public class AuthenticationLdapHandler : IHandleMessages
{
private readonly IBus _bus;
public IBus bus
{
get { return _bus; }
}
public AuthenticationLdapHandler(IBus bus)
{
_bus = bus;
}
public async Task Handle(AuthenticationRequest message)
{
await bus.Reply(new SearchStarted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });
var response = AuthenticateLdap(message); await bus.Reply(response);
await bus.Reply(new SearchCompleted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });
}
}
AuthenticationNative:
public class AuthenticationNativeHandler : IHandleMessages
{
private readonly IBus _bus;
public IBus bus
{
get { return _bus; }
}
public AuthenticationNativeHandler(IBus bus)
{
_bus = bus;
}
public async Task Handle(AuthenticationRequest message)
{
await bus.Reply(new SearchStarted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });
var response = AuthenticateNative(message); await bus.Reply(response);
await bus.Reply(new SearchCompleted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });
}
}
我们期待 AuthenticationCoordinator 中的响应如下:
来自 Ldap 的 SearchStarted 消息
来自本机的 SearchStarted 消息
- 来自 Ldap 的 AuthenticationResponse 消息
- 来自 Ldap 的 SearchCompleted 消息
- 来自本机的 AuthenticationResponse 消息
- 来自本机的 SearchCompleted 消息
但现在我们收到的响应顺序如下:
- 来自 Ldap 的 SearchStarted 消息
- 来自 Ldap 的 AuthenticationResponse 消息
- 来自 Ldap 的 SearchCompleted 消息
- 来自本机的 SearchStarted 消息
- 来自本机的 AuthenticationResponse 消息
- 来自本机的 SearchCompleted 消息
我们可以设置消息的优先级吗?我们如何才能在 rebus 5 中获得高于预期的响应。
您所看到的很可能是 Rebus 确保在您的处理程序完成执行后发送所有传出消息这一事实的结果。
它通过在其事务上下文中征集所有总线操作来做到这一点,只有在您的处理程序代码完成后才会提交。
这意味着代码像
public async Task Handle(string message)
{
await bus.Reply("this is message 1");
await Task.Delay(TimeSpan.FromSeconds(1));
await bus.Reply("this is message 2");
await Task.Delay(TimeSpan.FromSeconds(1));
await bus.Reply("this is message 3");
}
当提交 Rebus 事务上下文时,将导致同时发送消息 1、2 和 3,这意味着接收方将以随机顺序接收它们。
如果您要立即从您的处理程序发送消息,您可以 "dismantle" 像这样的事务上下文:
var transactionContext = AmbientTransactionContext.Current;
AmbientTransactionContext.SetCurrent(null);
try
{
// current transaction will never know....
await bus.Send(whee);
}
finally
{
AmbientTransactionContext.SetCurrent(transactionContext);
}
我建议您将其包含在 IDisposable
的实现中,这样可以像这样使用:
using(new RebusTransactionContextDismantler())
{
// current transaction will never know....
await bus.Publish(whee);
}