在不妨碍重试机制的情况下从异常捕获块发布 Rebus 消息
Publishing the Rebus mesage from exception catch block without hampering retry mechanism
我正在尝试从 Rebus 消息处理程序中的 catch 块发布消息,而不影响 rebus 的重试机制。
我的意图是,
- 在消息处理程序中处理消息。
- 如果发生错误,捕获它,并使用相同的总线发布一些错误事件。
- 发布后立即“抛出”捕获的异常,以便为消息的 retry/re-delivery 自动放置 rebus ACK/NACK。
我无法实现上述目标,因为如果 rebus 消息处理程序抛出异常,该消息会自动标记为重新传递,并且回滚整个管道事务。这否定了上面的第二点,因为当事务被回滚时,我发送的要发布的消息也被回滚。有没有一种方法可以实现这一点,即发布消息以及自动重试能力。我的消息处理程序代码如下。
public Task Handle(SomeMessage message)
{
try
{
//Some code that may result in an error
}
catch (Exception ex)
{
_bus.PublishSomeMessageErrorEvent(ex);
// throw an error and let Rebus retry the delivery.
throw;
}
return Task.CompletedTask;
}
我还尝试使用二级重试,这样当失败的消息进入 IHandleMessages 处理程序时,我只需使用 [= 发布消息38=](...) 方法但是在设置二级重试时我在启动总线时收到异常。
_bus = Configure.With(...)
.Options(r=>r.SimpleRetryStrategy(secondLevelRetriesEnabled:true))
.ConfigureSqlServerTransportFromAppConfig()
.Logging(c => c.Log4Net())
.Start();
异常
已尝试注册主要 -> Rebus.Retry.Simple.SimpleRetryStrategySettings,但主要注册已存在:主要 -> Rebus.Retry.Simple.SimpleRetryStrategySettings
您可以像这样创建一个范围来使用 Rebus 的内置“事务范围抑制器”:
using (new RebusTransactionScopeSuppressor())
{
// bus operations in here will not be enlisted
// in the transaction scope of the message
// context (i.e. the one associated with the
// handler)
}
所以您的消息处理程序可以简单地去
public class SomeMessageHandler : IHandleMessages<SomeMessage>
{
readonly IBus _bus;
public SomeMessageHandler(IBus bus) => _bus = bus ?? throw new ArgumentNullException(nameof(bus));
public async Task Handle(SomeMessage message)
{
try
{
await DoSomethingThatCanThrow();
}
catch (Exception exception)
{
// publish event
using (new RebusTransactionScopeSuppressor())
{
var evt = new CaughtThisException(exception.ToString());
await _bus.Publish(evt);
}
// rethrow to have Rebus exception handling kick in
throw;
}
}
async Task DoSomethingThatCanThrow()
{
// (...)
}
}
并实现你想要的。
PS:记住要 await
异步的事情并不清楚 PublishSomeMessageErrorEvent
是同步还是异步,但不知何故你的代码看起来有点像它实际上可能是异步的。
我正在尝试从 Rebus 消息处理程序中的 catch 块发布消息,而不影响 rebus 的重试机制。
我的意图是,
- 在消息处理程序中处理消息。
- 如果发生错误,捕获它,并使用相同的总线发布一些错误事件。
- 发布后立即“抛出”捕获的异常,以便为消息的 retry/re-delivery 自动放置 rebus ACK/NACK。
我无法实现上述目标,因为如果 rebus 消息处理程序抛出异常,该消息会自动标记为重新传递,并且回滚整个管道事务。这否定了上面的第二点,因为当事务被回滚时,我发送的要发布的消息也被回滚。有没有一种方法可以实现这一点,即发布消息以及自动重试能力。我的消息处理程序代码如下。
public Task Handle(SomeMessage message)
{
try
{
//Some code that may result in an error
}
catch (Exception ex)
{
_bus.PublishSomeMessageErrorEvent(ex);
// throw an error and let Rebus retry the delivery.
throw;
}
return Task.CompletedTask;
}
我还尝试使用二级重试,这样当失败的消息进入 IHandleMessages
_bus = Configure.With(...)
.Options(r=>r.SimpleRetryStrategy(secondLevelRetriesEnabled:true))
.ConfigureSqlServerTransportFromAppConfig()
.Logging(c => c.Log4Net())
.Start();
异常 已尝试注册主要 -> Rebus.Retry.Simple.SimpleRetryStrategySettings,但主要注册已存在:主要 -> Rebus.Retry.Simple.SimpleRetryStrategySettings
您可以像这样创建一个范围来使用 Rebus 的内置“事务范围抑制器”:
using (new RebusTransactionScopeSuppressor())
{
// bus operations in here will not be enlisted
// in the transaction scope of the message
// context (i.e. the one associated with the
// handler)
}
所以您的消息处理程序可以简单地去
public class SomeMessageHandler : IHandleMessages<SomeMessage>
{
readonly IBus _bus;
public SomeMessageHandler(IBus bus) => _bus = bus ?? throw new ArgumentNullException(nameof(bus));
public async Task Handle(SomeMessage message)
{
try
{
await DoSomethingThatCanThrow();
}
catch (Exception exception)
{
// publish event
using (new RebusTransactionScopeSuppressor())
{
var evt = new CaughtThisException(exception.ToString());
await _bus.Publish(evt);
}
// rethrow to have Rebus exception handling kick in
throw;
}
}
async Task DoSomethingThatCanThrow()
{
// (...)
}
}
并实现你想要的。
PS:记住要 await
异步的事情并不清楚 PublishSomeMessageErrorEvent
是同步还是异步,但不知何故你的代码看起来有点像它实际上可能是异步的。