Rebus Saga:当订阅者发送给发布者时,消息未按正确的顺序到达

Rebus Saga: Message is not arrived in correct sequence when subscriber(s) sent to publisher

最近,我们将 rebus 版本从 1 升级到 5,然后我们遇到了 Saga 处理程序的问题。现在我们没有从不同的订阅者那里得到正确顺序的响应。

我们有不同的来源来验证请求,为此我们有协调器来处理来自不同验证源的所有响应,但现在的问题是:所有 "SearchStarted" 消息并非来自所有验证的协调器首先到达源,因此我们无法检查有多少认证源开始认证。

尝试以不同的方式发送消息,例如 1. 使用 SEND 方法代替 REPLY。 2. 在发送响应之前尝试使用我们的 without await 关键字。 3. 尝试使用 Send/Reply 方法的 .Wait() 方法。

身份验证协调器:

public class AuthenticationSaga : Saga<AuthenticationSagaData>, IAmInitiatedBy<AuthenticationRequest>, IHandleMessages<SearchStarted>, IHandleMessages<SearchCompleted>, IHandleMessages<AuthenticationResponse>
{
    private readonly IBus _bus;

    public IBus Bus
    {
        get { return _bus; }
    }

    public AuthenticationSaga(IBus bus)
    {
        _bus = bus;
    }

    public async Task Handle(AuthenticationRequest message)
    {
        if (!IsNew) return;

        Data.Id = new Guid(MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId]);
        Data.ReturnAddress = MessageContext.Current.Headers[Rebus.Messages.Headers.ReturnAddress];
        message.UniqueId = Data.Id.ToString();
        Data.RequestMessage = message;
        Bus.Publish(message);
    }

    public async Task Handle(SearchStarted message)
    {

    }

    public async Task Handle(SearchCompleted message)
    {

    }

    public async Task Handle(AuthenticationResponse message)
    {

    }

    protected override void CorrelateMessages(ICorrelationConfig<AuthenticationSagaData> config)
    {
        config.Correlate<AuthenticationRequest>(m => m.UniqueId, d => d.Id);
        config.Correlate<SearchStarted>(m => m.UniqueId, d => d.Id);
        config.Correlate<AuthenticationResponse>(m => m.UniqueId, d => d.Id);
        config.Correlate<SearchCompleted>(m => m.UniqueId, d => d.Id);
    }
}

AuthenticationLdap:

public class AuthenticationLdapHandler : IHandleMessages
{
    private readonly IBus _bus;

    public IBus bus
    {
        get { return _bus; }
    }

    public AuthenticationLdapHandler(IBus bus)
    {
        _bus = bus;
    }

    public async Task Handle(AuthenticationRequest message)
    {
        await bus.Reply(new SearchStarted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });

        var response = AuthenticateLdap(message); await bus.Reply(response);

        await bus.Reply(new SearchCompleted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });

    }
}

AuthenticationNative:

public class AuthenticationNativeHandler : IHandleMessages
{
    private readonly IBus _bus;

    public IBus bus
    {
        get { return _bus; }
    }

    public AuthenticationNativeHandler(IBus bus)
    {
        _bus = bus;
    }

    public async Task Handle(AuthenticationRequest message)
    {
        await bus.Reply(new SearchStarted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });

        var response = AuthenticateNative(message); await bus.Reply(response);

        await bus.Reply(new SearchCompleted { MessageId = MessageContext.Current.Headers[Rebus.Messages.Headers.CorrelationId], UniqueId = message.UniqueId });

    }
}

我们期待 AuthenticationCoordinator 中的响应如下:

  1. 来自 Ldap 的 SearchStarted 消息

  2. 来自本机的 SearchStarted 消息

  3. 来自 Ldap 的 AuthenticationResponse 消息
  4. 来自 Ldap 的 SearchCompleted 消息
  5. 来自本机的 AuthenticationResponse 消息
  6. 来自本机的 SearchCompleted 消息

但现在我们收到的响应顺序如下:

  1. 来自 Ldap 的 SearchStarted 消息
  2. 来自 Ldap 的 AuthenticationResponse 消息
  3. 来自 Ldap 的 SearchCompleted 消息
  4. 来自本机的 SearchStarted 消息
  5. 来自本机的 AuthenticationResponse 消息
  6. 来自本机的 SearchCompleted 消息

我们可以设置消息的优先级吗?我们如何才能在 rebus 5 中获得高于预期的响应。

您所看到的很可能是 Rebus 确保在您的处理程序完成执行后发送所有传出消息这一事实的结果。

它通过在其事务上下文中征集所有总线操作来做到这一点,只有在您的处理程序代码完成后才会提交。

这意味着代码像

public async Task Handle(string message)
{
    await bus.Reply("this is message 1");

    await Task.Delay(TimeSpan.FromSeconds(1));

    await bus.Reply("this is message 2");

    await Task.Delay(TimeSpan.FromSeconds(1));

    await bus.Reply("this is message 3");
}

当提交 Rebus 事务上下文时,将导致同时发送消息 1、2 和 3,这意味着接收方将以随机顺序接收它们。

如果您要立即从您的处理程序发送消息,您可以 "dismantle" 像这样的事务上下文:

var transactionContext = AmbientTransactionContext.Current;
AmbientTransactionContext.SetCurrent(null);
try
{
    // current transaction will never know....
    await bus.Send(whee);
}
finally
{
    AmbientTransactionContext.SetCurrent(transactionContext);
}

我建议您将其包含在 IDisposable 的实现中,这样可以像这样使用:

using(new RebusTransactionContextDismantler())
{
    // current transaction will never know....
    await bus.Publish(whee);
}