操作需要时间时重试消息
Message being retried when operation takes time
我有一个使用 Azure ServiceBus 的消息传递系统,但我在其上使用了 Nimbus。我有一个端点向另一个端点发送命令,并且在某一时刻另一侧的处理程序 class 接收它,所以一切正常。
当操作需要时间(大约超过 20 秒左右)时,处理程序会收到 'another' 调用并显示相同的消息。看起来 Nimbus 正在重试已经由处理程序的另一个(甚至是相同的)实例处理的消息,我没有看到任何异常被抛出,我可以使用以下处理程序轻松重现:
public class Synchronizer : IHandleCommand<RequestSynchronization>
{
public async Task Handle(RequestSynchronization synchronizeInfo)
{
Console.WriteLine("Received Synchronization");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process
Console.WriteLine("Got through first timeout");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process
Console.WriteLine("Got through second timeout");
}
}
我的问题是:如何禁用此行为?我很高兴交易需要时间,因为这是一个繁重的过程,我已经从我的网站上卸载了,这首先是采用这种架构的全部意义所在。
换句话说,我希望消息不会被另一个处理程序拾取,而另一个处理程序已经拾取并正在处理它,除非出现异常并且消息返回队列并最终被拾取等待重试。
有什么办法吗?有什么我想念的吗?
我认为您正在寻找代码:http://www.uglybugger.org/software/post/support_for_long_running_handlers_in_nimbus
默认情况下,ASB/WSB 会给您 30 秒的消息锁定。这个想法是你从队列的头部弹出一个 BrokeredMessage 但必须在锁定超时内 .Complete() 或 .Abandon() 该消息。
如果您不这样做,服务总线会假定您已经崩溃或以其他方式失败,并且它将return该消息发送到队列以进行重新处理。
您有两个选择:
1) 在您的处理程序上实施 ILongRunningHandler。 Nimbus 会关注剩余的锁定时间,并自动续订您的消息锁定。注意:无论您更新多少次,ASB/WSB 支持的最长消息锁定时间为五分钟,因此如果您的处理程序花费的时间比这更长,那么您可能需要选项 #2。
public class Synchronizer : IHandleCommand<RequestSynchronization>, ILongRunningTask
{
public async Task Handle(RequestSynchronization synchronizeInfo)
{
Console.WriteLine("Received Synchronization");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process
Console.WriteLine("Got through first timeout");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process
Console.WriteLine("Got through second timeout");
}
}
2) 在您的处理程序中,调用 Task.Run(() => SomeService(yourMessage)) 和 return。如果您这样做,请注意依赖项的生命周期范围(如果您的处理程序需要任何依赖项)。如果您需要 IFoo,请依赖于 Func>(或等效项,具体取决于您的容器)并在您的处理任务中解决它。
public class Synchronizer : IHandleCommand<RequestSynchronization>
{
private readonly Func<Owned<IFoo>> fooFunc;
public Synchronizer(Func<Owned<IFoo>> fooFunc)
{
_fooFunc = fooFunc;
}
public async Task Handle(RequestSynchronization synchronizeInfo)
{
// don't await!
Task.Run(() => {
using (var foo = _fooFunc())
{
Console.WriteLine("Received Synchronization");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process
Console.WriteLine("Got through first timeout");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process
Console.WriteLine("Got through second timeout");
}
});
}
}
我有一个使用 Azure ServiceBus 的消息传递系统,但我在其上使用了 Nimbus。我有一个端点向另一个端点发送命令,并且在某一时刻另一侧的处理程序 class 接收它,所以一切正常。
当操作需要时间(大约超过 20 秒左右)时,处理程序会收到 'another' 调用并显示相同的消息。看起来 Nimbus 正在重试已经由处理程序的另一个(甚至是相同的)实例处理的消息,我没有看到任何异常被抛出,我可以使用以下处理程序轻松重现:
public class Synchronizer : IHandleCommand<RequestSynchronization>
{
public async Task Handle(RequestSynchronization synchronizeInfo)
{
Console.WriteLine("Received Synchronization");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process
Console.WriteLine("Got through first timeout");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process
Console.WriteLine("Got through second timeout");
}
}
我的问题是:如何禁用此行为?我很高兴交易需要时间,因为这是一个繁重的过程,我已经从我的网站上卸载了,这首先是采用这种架构的全部意义所在。
换句话说,我希望消息不会被另一个处理程序拾取,而另一个处理程序已经拾取并正在处理它,除非出现异常并且消息返回队列并最终被拾取等待重试。
有什么办法吗?有什么我想念的吗?
我认为您正在寻找代码:http://www.uglybugger.org/software/post/support_for_long_running_handlers_in_nimbus
默认情况下,ASB/WSB 会给您 30 秒的消息锁定。这个想法是你从队列的头部弹出一个 BrokeredMessage 但必须在锁定超时内 .Complete() 或 .Abandon() 该消息。
如果您不这样做,服务总线会假定您已经崩溃或以其他方式失败,并且它将return该消息发送到队列以进行重新处理。
您有两个选择:
1) 在您的处理程序上实施 ILongRunningHandler。 Nimbus 会关注剩余的锁定时间,并自动续订您的消息锁定。注意:无论您更新多少次,ASB/WSB 支持的最长消息锁定时间为五分钟,因此如果您的处理程序花费的时间比这更长,那么您可能需要选项 #2。
public class Synchronizer : IHandleCommand<RequestSynchronization>, ILongRunningTask
{
public async Task Handle(RequestSynchronization synchronizeInfo)
{
Console.WriteLine("Received Synchronization");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process
Console.WriteLine("Got through first timeout");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process
Console.WriteLine("Got through second timeout");
}
}
2) 在您的处理程序中,调用 Task.Run(() => SomeService(yourMessage)) 和 return。如果您这样做,请注意依赖项的生命周期范围(如果您的处理程序需要任何依赖项)。如果您需要 IFoo,请依赖于 Func>(或等效项,具体取决于您的容器)并在您的处理任务中解决它。
public class Synchronizer : IHandleCommand<RequestSynchronization>
{
private readonly Func<Owned<IFoo>> fooFunc;
public Synchronizer(Func<Owned<IFoo>> fooFunc)
{
_fooFunc = fooFunc;
}
public async Task Handle(RequestSynchronization synchronizeInfo)
{
// don't await!
Task.Run(() => {
using (var foo = _fooFunc())
{
Console.WriteLine("Received Synchronization");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process
Console.WriteLine("Got through first timeout");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process
Console.WriteLine("Got through second timeout");
}
});
}
}