操作需要时间时重试消息

Message being retried when operation takes time

我有一个使用 Azure ServiceBus 的消息传递系统,但我在其上使用了 Nimbus。我有一个端点向另一个端点发送命令,并且在某一时刻另一侧的处理程序 class 接收它,所以一切正常。

当操作需要时间(大约超过 20 秒左右)时,处理程序会收到 'another' 调用并显示相同的消息。看起来 Nimbus 正在重试已经由处理程序的另一个(甚至是相同的)实例处理的消息,我没有看到任何异常被抛出,我可以使用以下处理程序轻松重现:

public class Synchronizer : IHandleCommand<RequestSynchronization>
{
    public async Task Handle(RequestSynchronization synchronizeInfo)
    {
        Console.WriteLine("Received Synchronization");

        await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process

        Console.WriteLine("Got through first timeout");

        await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process

        Console.WriteLine("Got through second timeout");
    }
}

我的问题是:如何禁用此行为?我很高兴交易需要时间,因为这是一个繁重的过程,我已经从我的网站上卸载了,这首先是采用这种架构的全部意义所在。

换句话说,我希望消息不会被另一个处理程序拾取,而另一个处理程序已经拾取并正在处理它,除非出现异常并且消息返回队列并最终被拾取等待重试。

有什么办法吗?有什么我想念的吗?

我认为您正在寻找代码:http://www.uglybugger.org/software/post/support_for_long_running_handlers_in_nimbus

默认情况下,ASB/WSB 会给您 30 秒的消息锁定。这个想法是你从队列的头部弹出一个 BrokeredMessage 但必须在锁定超时内 .Complete() 或 .Abandon() 该消息。

如果您不这样做,服务总线会假定您已经崩溃或以其他方式失败,并且它将return该消息发送到队列以进行重新处理。

您有两个选择:

1) 在您的处理程序上实施 ILongRunningHandler。 Nimbus 会关注剩余的锁定时间,并自动续订您的消息锁定。注意:无论您更新多少次,ASB/WSB 支持的最长消息锁定时间为五分钟,因此如果您的处理程序花费的时间比这更长,那么您可能需要选项 #2。

public class Synchronizer : IHandleCommand<RequestSynchronization>, ILongRunningTask
{
    public async Task Handle(RequestSynchronization synchronizeInfo)
    {
        Console.WriteLine("Received Synchronization");

        await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process

        Console.WriteLine("Got through first timeout");

        await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process

        Console.WriteLine("Got through second timeout");
    }
}

2) 在您的处理程序中,调用 Task.Run(() => SomeService(yourMessage)) 和 return。如果您这样做,请注意依赖项的生命周期范围(如果您的处理程序需要任何依赖项)。如果您需要 IFoo,请依赖于 Func>(或等效项,具体取决于您的容器)并在您的处理任务中解决它。

public class Synchronizer : IHandleCommand<RequestSynchronization>
{
    private readonly Func<Owned<IFoo>> fooFunc;

    public Synchronizer(Func<Owned<IFoo>> fooFunc)
    {
        _fooFunc = fooFunc;
    }

    public async Task Handle(RequestSynchronization synchronizeInfo)
    {
        // don't await!
        Task.Run(() => {
            using (var foo = _fooFunc())
            {
              Console.WriteLine("Received Synchronization");

              await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process

              Console.WriteLine("Got through first timeout");

              await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process

              Console.WriteLine("Got through second timeout");
            }

        });
    }
}