如何中止交易中的交易?
How to abort a transaction in rebus?
当我尝试中止消息处理程序中的事务时,我遇到了困难。我正在使用 RabbitMQ.
我的目标是实现以下行为:如果收到消息,我会尝试将其内容存储在硬盘上。如果失败,我想重新排队消息。通过这样做,我为同一服务的另一个实例提供了尝试相同服务的机会。
我想要的基本上是控制消息何时被 ACK
ed 或拒绝的可能性。
我查看了源代码,特别是 RabbitMqTransport.cs
,发现提交事务时会发送 ACK
。如果事务中止,则发送 NACK
。我曾经自己围绕 RabbitMQ 创建了一个包装器 class,因此我知道这是正确的。
但是似乎从未调用过 OnAborted
。即使我中止交易,也会调用 OnComitted
。
我使用以下代码中止交易。 context
是传递给 Messagehandler
的 IMessageContext
实例。
context.TransactionContext.OnAborted(() =>
{
Console.WriteLine("Abort");
});
context.TransactionContext.OnCommitted(async () =>
{
Console.WriteLine("Commit");
});
context.TransactionContext.Abort();
我也尝试了不同的变体,比如获取 AmbientTransactionContext
或使用 Rebus.TransactionScope
包,但没有效果。
您的 post 不清楚您是如何中止交易的。我们对 RabbitMq 使用类似的设置,我总是从处理程序内部抛出异常。
Rebus 旨在按照您的预期处理 ACK/NACK,具体取决于您的消息处理程序是否抛出异常。
换句话说,如果您的消息处理程序没有抛出异常,则认为消息传递成功,消息被确认。
另一方面,如果您的消息处理程序抛出异常,消息将被 NACKed(因此在 RabbitMQ 术语中其状态重置为 "Ready"),有效地使另一个实例可以抓取消息。
但是,由于 RabbitMQ 客户端驱动程序的设计方式,我认为该消息实际上不会返回到服务器以供其他实例接收 - 我认为(这只是一个猜测)事实上,驱动程序预取消息并将它们存储在内存队列中直到它们被使用导致消息被简单地在内部标记为重新传递。
因此,我希望同一个服务实例执行所有传递尝试,然后 - 正如您正确观察到的那样 - Rebus 会将消息移到错误队列中,从而安全地存储它以供处理稍后。
我希望这是有道理的:)
@mookid8000:
好吧,这很清楚,当消息被 ACK
ed 或 NACK
ed 时。
但是我能够反驳你的假设。我在绑定到同一输入队列的不同控制台应用程序中创建了两个消费者。其中之一总是抛出异常。发布者每 10 秒发送一次消息。我可以看到,其他消费者在故障抛出异常后处理消息。我看到了,因为消息的 ID 由 Rebus 记录在控制台中。
这回答了我的问题,但没有解决我的问题。就我而言,我真正想要的是让消息保留在队列中的可能性,直到其中一个服务实例能够处理它。原因是消息的顺序很重要。这可能是我方法中的一个根本错误,但现在我不想改变它。
有没有办法防止(某些)消息被移动到错误队列? Rebus中的二级重试机制是否可以实现?
请参阅以下源代码以供进一步参考:
普通消费者:
class TimeEventHandler : IHandleMessages<TimeEvent>
{
public async Task Handle(TimeEvent message)
{
Console.WriteLine(message.Time);
//await Program.Activator.Bus.Reply("Ja danke");
}
}
class Program
{
public static BuiltinHandlerActivator Activator;
static void Main(string[] args)
{
using (Activator = new BuiltinHandlerActivator())
{
Activator.Register(() => new TimeEventHandler());
var bus = Configure
.With(Activator)
.Transport(t => t.UseRabbitMq("amqp://guest:guest@192.168.3.50",
$"ConsumerPrototype").Prefetch(1))
.Routing(r => r.TypeBased())
.Start();
bus.Subscribe<TimeEvent>();
Console.WriteLine("Press enter to quit");
Console.ReadLine();
// Without the unsubscribe we have a durable subscriber, see http://www.enterpriseintegrationpatterns.com/patterns/messaging/DurableSubscription.html
//bus.Unsubscribe<TimeEvent>();
}
}
}
故障消费者:
class FaultedTimeEventHandler : IHandleMessages<TimeEvent>
{
public async Task Handle(TimeEvent message)
{
throw new Exception("That should not have happened");
}
}
class Program
{
public static BuiltinHandlerActivator Activator;
static void Main(string[] args)
{
using (Activator = new BuiltinHandlerActivator())
{
Activator.Register(() => new FaultedTimeEventHandler());
var bus = Configure
.With(Activator)
.Transport(t => t.UseRabbitMq("amqp://guest:guest@192.168.3.50",
$"ConsumerPrototype").Prefetch(1))
.Routing(r => r.TypeBased())
.Start();
bus.Subscribe<TimeEvent>();
Console.WriteLine("Press enter to quit");
Console.ReadLine();
}
}
}
出版商:
public static class PubSubTest
{
public static void Start()
{
Console.WriteLine("Starting PubSubTest");
using (var activator = new BuiltinHandlerActivator())
{
var bus = Configure
.With(activator)
.Transport(t => t.UseRabbitMq("amqp://guest:guest@192.168.3.50", "MessagingTest").Prefetch(1))
.Routing(r => r.TypeBased())
.Start();
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10))
.Subscribe(_ => bus.Publish(new TimeEvent(DateTime.Now)).Wait());
Console.WriteLine("Press enter to quit");
Console.ReadLine();
}
}
}
26.07.17 更新:
我的二级重试结果:激活它们后,我通过推迟和重新发送它们来处理它们。通过这样做,我至少可以确保消息在稍后处理并且不会丢失。
public async Task Handle(IFailed<TimeEvent> failedMessage)
{
await bus.Defer(TimeSpan.FromSeconds(30), failedMessage.Message);
}
这不是最佳解决方案:
- 我的消息顺序发生了变化:一条消息延迟后,从队列中消费下一条消息。
- Timeoutmanager 在内存中,我无法访问 SQL-Server。
我了解到可以通过 RabbitMQ 使用延迟消息。要么你利用 dead-letter-exchanges or a plugin.
将 maxDeliveryAttempts 增加到 Int32.MaxValue
非常脏,但它做了我想要的:它保留了消息,最重要的是消息在队列中的顺序。
我将此问题标记为已解决,因为问题 "How to abort a transaction in Rebus" 已得到回答。
当我尝试中止消息处理程序中的事务时,我遇到了困难。我正在使用 RabbitMQ.
我的目标是实现以下行为:如果收到消息,我会尝试将其内容存储在硬盘上。如果失败,我想重新排队消息。通过这样做,我为同一服务的另一个实例提供了尝试相同服务的机会。
我想要的基本上是控制消息何时被 ACK
ed 或拒绝的可能性。
我查看了源代码,特别是 RabbitMqTransport.cs
,发现提交事务时会发送 ACK
。如果事务中止,则发送 NACK
。我曾经自己围绕 RabbitMQ 创建了一个包装器 class,因此我知道这是正确的。
但是似乎从未调用过 OnAborted
。即使我中止交易,也会调用 OnComitted
。
我使用以下代码中止交易。 context
是传递给 Messagehandler
的 IMessageContext
实例。
context.TransactionContext.OnAborted(() =>
{
Console.WriteLine("Abort");
});
context.TransactionContext.OnCommitted(async () =>
{
Console.WriteLine("Commit");
});
context.TransactionContext.Abort();
我也尝试了不同的变体,比如获取 AmbientTransactionContext
或使用 Rebus.TransactionScope
包,但没有效果。
您的 post 不清楚您是如何中止交易的。我们对 RabbitMq 使用类似的设置,我总是从处理程序内部抛出异常。
Rebus 旨在按照您的预期处理 ACK/NACK,具体取决于您的消息处理程序是否抛出异常。
换句话说,如果您的消息处理程序没有抛出异常,则认为消息传递成功,消息被确认。
另一方面,如果您的消息处理程序抛出异常,消息将被 NACKed(因此在 RabbitMQ 术语中其状态重置为 "Ready"),有效地使另一个实例可以抓取消息。
但是,由于 RabbitMQ 客户端驱动程序的设计方式,我认为该消息实际上不会返回到服务器以供其他实例接收 - 我认为(这只是一个猜测)事实上,驱动程序预取消息并将它们存储在内存队列中直到它们被使用导致消息被简单地在内部标记为重新传递。
因此,我希望同一个服务实例执行所有传递尝试,然后 - 正如您正确观察到的那样 - Rebus 会将消息移到错误队列中,从而安全地存储它以供处理稍后。
我希望这是有道理的:)
@mookid8000:
好吧,这很清楚,当消息被 ACK
ed 或 NACK
ed 时。
但是我能够反驳你的假设。我在绑定到同一输入队列的不同控制台应用程序中创建了两个消费者。其中之一总是抛出异常。发布者每 10 秒发送一次消息。我可以看到,其他消费者在故障抛出异常后处理消息。我看到了,因为消息的 ID 由 Rebus 记录在控制台中。
这回答了我的问题,但没有解决我的问题。就我而言,我真正想要的是让消息保留在队列中的可能性,直到其中一个服务实例能够处理它。原因是消息的顺序很重要。这可能是我方法中的一个根本错误,但现在我不想改变它。
有没有办法防止(某些)消息被移动到错误队列? Rebus中的二级重试机制是否可以实现?
请参阅以下源代码以供进一步参考:
普通消费者:
class TimeEventHandler : IHandleMessages<TimeEvent>
{
public async Task Handle(TimeEvent message)
{
Console.WriteLine(message.Time);
//await Program.Activator.Bus.Reply("Ja danke");
}
}
class Program
{
public static BuiltinHandlerActivator Activator;
static void Main(string[] args)
{
using (Activator = new BuiltinHandlerActivator())
{
Activator.Register(() => new TimeEventHandler());
var bus = Configure
.With(Activator)
.Transport(t => t.UseRabbitMq("amqp://guest:guest@192.168.3.50",
$"ConsumerPrototype").Prefetch(1))
.Routing(r => r.TypeBased())
.Start();
bus.Subscribe<TimeEvent>();
Console.WriteLine("Press enter to quit");
Console.ReadLine();
// Without the unsubscribe we have a durable subscriber, see http://www.enterpriseintegrationpatterns.com/patterns/messaging/DurableSubscription.html
//bus.Unsubscribe<TimeEvent>();
}
}
}
故障消费者:
class FaultedTimeEventHandler : IHandleMessages<TimeEvent>
{
public async Task Handle(TimeEvent message)
{
throw new Exception("That should not have happened");
}
}
class Program
{
public static BuiltinHandlerActivator Activator;
static void Main(string[] args)
{
using (Activator = new BuiltinHandlerActivator())
{
Activator.Register(() => new FaultedTimeEventHandler());
var bus = Configure
.With(Activator)
.Transport(t => t.UseRabbitMq("amqp://guest:guest@192.168.3.50",
$"ConsumerPrototype").Prefetch(1))
.Routing(r => r.TypeBased())
.Start();
bus.Subscribe<TimeEvent>();
Console.WriteLine("Press enter to quit");
Console.ReadLine();
}
}
}
出版商:
public static class PubSubTest
{
public static void Start()
{
Console.WriteLine("Starting PubSubTest");
using (var activator = new BuiltinHandlerActivator())
{
var bus = Configure
.With(activator)
.Transport(t => t.UseRabbitMq("amqp://guest:guest@192.168.3.50", "MessagingTest").Prefetch(1))
.Routing(r => r.TypeBased())
.Start();
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10))
.Subscribe(_ => bus.Publish(new TimeEvent(DateTime.Now)).Wait());
Console.WriteLine("Press enter to quit");
Console.ReadLine();
}
}
}
26.07.17 更新: 我的二级重试结果:激活它们后,我通过推迟和重新发送它们来处理它们。通过这样做,我至少可以确保消息在稍后处理并且不会丢失。
public async Task Handle(IFailed<TimeEvent> failedMessage)
{
await bus.Defer(TimeSpan.FromSeconds(30), failedMessage.Message);
}
这不是最佳解决方案:
- 我的消息顺序发生了变化:一条消息延迟后,从队列中消费下一条消息。
- Timeoutmanager 在内存中,我无法访问 SQL-Server。
我了解到可以通过 RabbitMQ 使用延迟消息。要么你利用 dead-letter-exchanges or a plugin.
将 maxDeliveryAttempts 增加到 Int32.MaxValue
非常脏,但它做了我想要的:它保留了消息,最重要的是消息在队列中的顺序。
我将此问题标记为已解决,因为问题 "How to abort a transaction in Rebus" 已得到回答。