公共交通传奇故障处理
Masstransit saga fault handling
我正在尝试使用 masstransit 和 azure sb 创建一个简单的 saga 错误处理场景,并且已经为接收者提供了以下工作。
传奇目前在其中一个步骤中抛出错误,并且在重试几次后按预期发送到 _error 队列。
根据在 saga 中导致错误的步骤,消息将由 Step1FaultConsumer 或 Step2FaultConsumer 处理。我想知道是否有更好的方法从一个位置处理整个 saga 中的错误,而不管导致错误的步骤如何。
如果可能,我正在尝试在不使用状态机的情况下执行此操作。
program.cs
var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
var host = cfg.Host(new Uri("HostAddress"),
h =>
{
h.TransportType = TransportType.AmqpWebSockets;
h.OperationTimeout = TimeSpan.FromSeconds(5);
h.RetryLimit = 5;
h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "SharedAccessKey");
});
cfg.RequiresSession = true;
cfg.UseRetry(x => x.Exponential(5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2)));
cfg.ReceiveEndpoint(host, "QueueName", e =>
{
e.RequiresSession = true;
e.Saga<SagaConsumer>(new MessageSessionSagaRepository<SagaConsumer>());
e.Consumer<Step1FaultConsumer>();
e.Consumer<Step2FaultConsumer>();
});
});
saga.cs
public class SagaConsumer:
ISaga,
InitiatedBy<ISagaStep1>,
Orchestrates<ISagaStep2>
{
public Guid CorrelationId { get; set; }
public async Task Consume(ConsumeContext<ISagaStep1> context)
{
if(context.Message.Input1 == "000")
throw new Exception("Saga Step 1 Exception");
await context.Send<ISagaStep2>(context.DestinationAddress, new { Input2 = $"2-{context.Message.Input1}" });
}
public async Task Consume(ConsumeContext<ISagaStep2> context)
{
throw new Exception("Saga Step 2 Exception");
}
}
public class Step1FaultConsumer:
IConsumer<Fault<ISagaStep1>>
{
public async Task Consume(ConsumeContext<Fault<ISagaStep1>> context)
{
// Handle Error
}
}
public class Step2FaultConsumer:
IConsumer<Fault<ISagaStep2>>
{
public async Task Consume(ConsumeContext<Fault<ISagaStep2>> context)
{
// Handle Error
}
}
您可以更新您的使用者以通过两个 IConsumer<T>
接口和两个 Consume 方法来处理这两种故障类型。
我还建议为错误消费者使用单独的 queue/receive-endpoint,以避免阻塞 saga 端点上的吞吐量。
我正在尝试使用 masstransit 和 azure sb 创建一个简单的 saga 错误处理场景,并且已经为接收者提供了以下工作。
传奇目前在其中一个步骤中抛出错误,并且在重试几次后按预期发送到 _error 队列。
根据在 saga 中导致错误的步骤,消息将由 Step1FaultConsumer 或 Step2FaultConsumer 处理。我想知道是否有更好的方法从一个位置处理整个 saga 中的错误,而不管导致错误的步骤如何。
如果可能,我正在尝试在不使用状态机的情况下执行此操作。
program.cs
var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
var host = cfg.Host(new Uri("HostAddress"),
h =>
{
h.TransportType = TransportType.AmqpWebSockets;
h.OperationTimeout = TimeSpan.FromSeconds(5);
h.RetryLimit = 5;
h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "SharedAccessKey");
});
cfg.RequiresSession = true;
cfg.UseRetry(x => x.Exponential(5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2)));
cfg.ReceiveEndpoint(host, "QueueName", e =>
{
e.RequiresSession = true;
e.Saga<SagaConsumer>(new MessageSessionSagaRepository<SagaConsumer>());
e.Consumer<Step1FaultConsumer>();
e.Consumer<Step2FaultConsumer>();
});
});
saga.cs
public class SagaConsumer:
ISaga,
InitiatedBy<ISagaStep1>,
Orchestrates<ISagaStep2>
{
public Guid CorrelationId { get; set; }
public async Task Consume(ConsumeContext<ISagaStep1> context)
{
if(context.Message.Input1 == "000")
throw new Exception("Saga Step 1 Exception");
await context.Send<ISagaStep2>(context.DestinationAddress, new { Input2 = $"2-{context.Message.Input1}" });
}
public async Task Consume(ConsumeContext<ISagaStep2> context)
{
throw new Exception("Saga Step 2 Exception");
}
}
public class Step1FaultConsumer:
IConsumer<Fault<ISagaStep1>>
{
public async Task Consume(ConsumeContext<Fault<ISagaStep1>> context)
{
// Handle Error
}
}
public class Step2FaultConsumer:
IConsumer<Fault<ISagaStep2>>
{
public async Task Consume(ConsumeContext<Fault<ISagaStep2>> context)
{
// Handle Error
}
}
您可以更新您的使用者以通过两个 IConsumer<T>
接口和两个 Consume 方法来处理这两种故障类型。
我还建议为错误消费者使用单独的 queue/receive-endpoint,以避免阻塞 saga 端点上的吞吐量。