Rebus Sagas、修订和 DeferredMessages
Rebus Sagas, Revisions and DeferredMessages
我正在尝试配置按以下方式工作的 Saga:
- Saga 收到装运订单消息。该运输订单有一个 RouteId 属性,我可以使用它来关联相同 "truck"
的运输订单
- 这些运输订单是由另一个系统创建的,可以使用批处理来发送这些订单。但是,此系统无法将同一地址的运输订单分组。
- 几秒钟后,我发送了另一条仅包含此 RouteId 的消息。我需要获取接收到的 RouteId 的所有运输订单,按地址对它们进行分组,然后将其转换为另一个对象并发送到另一个 Web 服务。
但我面临两个问题:
- 如果我将两条消息 "at the same time" 发送到第一个处理程序,每条消息都会出现,即使具有与该消息相关的属性,IsNew 属性 在处理完第一条消息后也不会更改
- 在第二个处理程序中,我希望访问与那些 Saga 相关的所有数据,但我不能,因为数据似乎是那些消息的修订中的数据被推迟了。
相关代码:
saga 的总线配置
Bus = Configure.With(Activator)
.Transport(t => t.UseRabbitMq(rabbitMqConnectionString, inputQueueName))
.Logging(l => l.ColoredConsole())
.Routing(r => r.TypeBased().MapAssemblyOf<IEventContract(publisherQueue))
.Sagas(s => {
s.StoreInSqlServer(connectionString, "Sagas", "SagaIndex");
if (enforceExclusiveAccess)
{
s.EnforceExclusiveAccess();
}
})
.Options(o =>
{
if (maxDegreeOfParallelism > 0)
{
o.SetMaxParallelism(maxDegreeOfParallelism);
}
if (maxNumberOfWorkers > 0)
{
o.SetNumberOfWorkers(maxNumberOfWorkers);
}
})
.Timeouts(t => { t.StoreInSqlServer(dcMessengerConnectionString, "Timeouts"); })
.Start();
佐贺数据class:
public class RouteListSagaData : ISagaData
{
public Guid Id { get; set; }
public int Revision { get; set; }
private readonly IList<LisaShippingActivity> _shippingActivities = new List<LisaShippingActivity>();
public long RoutePlanId { get; set; }
public IEnumerable<LisaShippingActivity> ShippingActivities => _shippingActivities;
public bool SentToLisa { get; set; }
public void AddShippingActivity(LisaShippingActivity shippingActivity)
{
if (!_shippingActivities.Any(x => x.Equals(shippingActivity)))
{
_shippingActivities.Add(shippingActivity);
}
}
public IEnumerable<LisaShippingActivity> GroupShippingActivitiesToLisaActivities() => LisaShippingActivity.GroupedByRouteIdAndAddress(ShippingActivities);
}
CorrelateMessages 方法
protected override void CorrelateMessages(ICorrelationConfig<RouteListSagaData> config)
{
config.Correlate<ShippingOrder>(x => x.RoutePlanId, y => y.RoutePlanId);
config.Correlate<VerifyRouteListIsComplete>(x => x.RoutePlanId, y => y.RoutePlanId);
}
假设启动 Saga 的消息句柄,如果 saga IsNew 则发送 DefferedMessage
public async Task Handle(ShippingOrder message)
{
try
{
var lisaActivity = message.AsLisaShippingActivity(_commissionerUserName);
if (Data.ShippingActivities.Contains(lisaActivity))
return;
Data.RoutePlanId = message.RoutePlanId;
Data.AddShippingActivity(lisaActivity);
var delay = TimeSpan.FromSeconds(_lisaDelayedMessageTime != 0 ? _lisaDelayedMessageTime : 60);
if (IsNew)
{
await _serviceBus.DeferLocal(delay, new VerifyRouteListIsComplete(message.RoutePlanId), _environment);
}
}
catch (Exception err)
{
Serilog.Log.Logger.Error(err, "[{SagaName}] - Error while executing Route List Saga", nameof(RouteListSaga));
throw;
}
}
最后,延迟消息的处理程序:
public Task Handle(VerifyRouteListIsComplete message)
{
try
{
if (!Data.SentToLisa)
{
var lisaData = Data.GroupShippingActivitiesToLisaActivities();
_lisaService.SyncRouteList(lisaData).Wait();
Data.SentToLisa = true;
}
MarkAsComplete();
return Task.CompletedTask;
}
catch (Exception err)
{
Serilog.Log.Error(err, "[{SagaName}] - Error sending message to LisaApp. RouteId: {RouteId}", nameof(RouteListSaga), message.RoutePlanId);
_serviceBus.DeferLocal(TimeSpan.FromSeconds(5), message, _configuration.GetSection("AppSettings")["Environment"]).Wait();
MarkAsUnchanged();
return Task.CompletedTask;
}
}
感谢任何帮助!
我不确定我是否正确理解了您所遇到的症状。
If I send two messages "at the same time" to the first handler, each one comes and even with properties that correlate that messages, the IsNew property not changes after the first message processed
如果调用 EnforceExclusiveAccess
,我希望以串行方式处理消息,第一个使用 IsNew == true
,第二个使用 IsNew == false
。
如果不是,我希望两条消息都与 IsNew == true
并行处理,但是然后 – 当插入 sage 数据时 – 我希望其中一条成功,另一条失败ConcurrencyException
.
在 ConcurrencyException
之后,将再次处理消息,这次是 IsNew == false
。
这不是你正在经历的吗?
In the second handler, I wish to access all data related to those Saga, but I can't because the data seems to be the data as it was in the revision of those messages was deferred.
您是说 saga 数据中的数据似乎处于 VerifyRouteListIsComplete
消息被延迟时的状态?
这听起来很奇怪,也不太可能你能不能再试一次,看看是否真的如此?
更新:我发现您遇到这种奇怪行为的原因:您不小心将 saga 处理程序实例设置为跨消息重复使用。
您是这样注册的(警告:不要这样做!):
_sagaHandler = new ShippingOrderSagaHandler(_subscriber);
_subscriber.Subscribe<ShippingOrderMessage>(_sagaHandler);
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(_sagaHandler);
然后 Subscribe
方法在 BuiltinHandlerActivator
上进行此调用(警告:不要这样做!):
activator.Register(() => handlerInstance);
这是不好的原因(尤其是对于 saga 处理程序),是因为处理程序实例本身是有状态的——它有一个 Data
属性 包含进程的当前状态,并且这还包括 IsNew
属性.
您应该始终做的是确保每次收到消息时都创建一个新的处理程序实例 – 您的代码应该更改为如下所示:
_subscriber.Subscribe<ShippingOrderMessage>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();
把Subscribe
的实现改成这样就可以了:
public async Task Subscribe<T>(Func<IHandleMessages<T>> getHandler)
{
_activator.Register((bus, context) => getHandler());
await _activator.Bus.Subscribe<T>();
}
这将解决您的独占访问问题:)
您的代码还有另一个问题:在注册您的处理程序和启动订阅者总线实例之间存在潜在的竞争条件,因为理论上您可能会不幸地在总线启动和您注册您的订阅者之间开始接收消息处理程序。
您应该更改代码以确保在启动总线(并因此开始接收消息)之前注册所有处理程序。
我正在尝试配置按以下方式工作的 Saga:
- Saga 收到装运订单消息。该运输订单有一个 RouteId 属性,我可以使用它来关联相同 "truck" 的运输订单
- 这些运输订单是由另一个系统创建的,可以使用批处理来发送这些订单。但是,此系统无法将同一地址的运输订单分组。
- 几秒钟后,我发送了另一条仅包含此 RouteId 的消息。我需要获取接收到的 RouteId 的所有运输订单,按地址对它们进行分组,然后将其转换为另一个对象并发送到另一个 Web 服务。
但我面临两个问题:
- 如果我将两条消息 "at the same time" 发送到第一个处理程序,每条消息都会出现,即使具有与该消息相关的属性,IsNew 属性 在处理完第一条消息后也不会更改
- 在第二个处理程序中,我希望访问与那些 Saga 相关的所有数据,但我不能,因为数据似乎是那些消息的修订中的数据被推迟了。
相关代码:
saga 的总线配置
Bus = Configure.With(Activator)
.Transport(t => t.UseRabbitMq(rabbitMqConnectionString, inputQueueName))
.Logging(l => l.ColoredConsole())
.Routing(r => r.TypeBased().MapAssemblyOf<IEventContract(publisherQueue))
.Sagas(s => {
s.StoreInSqlServer(connectionString, "Sagas", "SagaIndex");
if (enforceExclusiveAccess)
{
s.EnforceExclusiveAccess();
}
})
.Options(o =>
{
if (maxDegreeOfParallelism > 0)
{
o.SetMaxParallelism(maxDegreeOfParallelism);
}
if (maxNumberOfWorkers > 0)
{
o.SetNumberOfWorkers(maxNumberOfWorkers);
}
})
.Timeouts(t => { t.StoreInSqlServer(dcMessengerConnectionString, "Timeouts"); })
.Start();
佐贺数据class:
public class RouteListSagaData : ISagaData
{
public Guid Id { get; set; }
public int Revision { get; set; }
private readonly IList<LisaShippingActivity> _shippingActivities = new List<LisaShippingActivity>();
public long RoutePlanId { get; set; }
public IEnumerable<LisaShippingActivity> ShippingActivities => _shippingActivities;
public bool SentToLisa { get; set; }
public void AddShippingActivity(LisaShippingActivity shippingActivity)
{
if (!_shippingActivities.Any(x => x.Equals(shippingActivity)))
{
_shippingActivities.Add(shippingActivity);
}
}
public IEnumerable<LisaShippingActivity> GroupShippingActivitiesToLisaActivities() => LisaShippingActivity.GroupedByRouteIdAndAddress(ShippingActivities);
}
CorrelateMessages 方法
protected override void CorrelateMessages(ICorrelationConfig<RouteListSagaData> config)
{
config.Correlate<ShippingOrder>(x => x.RoutePlanId, y => y.RoutePlanId);
config.Correlate<VerifyRouteListIsComplete>(x => x.RoutePlanId, y => y.RoutePlanId);
}
假设启动 Saga 的消息句柄,如果 saga IsNew 则发送 DefferedMessage
public async Task Handle(ShippingOrder message)
{
try
{
var lisaActivity = message.AsLisaShippingActivity(_commissionerUserName);
if (Data.ShippingActivities.Contains(lisaActivity))
return;
Data.RoutePlanId = message.RoutePlanId;
Data.AddShippingActivity(lisaActivity);
var delay = TimeSpan.FromSeconds(_lisaDelayedMessageTime != 0 ? _lisaDelayedMessageTime : 60);
if (IsNew)
{
await _serviceBus.DeferLocal(delay, new VerifyRouteListIsComplete(message.RoutePlanId), _environment);
}
}
catch (Exception err)
{
Serilog.Log.Logger.Error(err, "[{SagaName}] - Error while executing Route List Saga", nameof(RouteListSaga));
throw;
}
}
最后,延迟消息的处理程序:
public Task Handle(VerifyRouteListIsComplete message)
{
try
{
if (!Data.SentToLisa)
{
var lisaData = Data.GroupShippingActivitiesToLisaActivities();
_lisaService.SyncRouteList(lisaData).Wait();
Data.SentToLisa = true;
}
MarkAsComplete();
return Task.CompletedTask;
}
catch (Exception err)
{
Serilog.Log.Error(err, "[{SagaName}] - Error sending message to LisaApp. RouteId: {RouteId}", nameof(RouteListSaga), message.RoutePlanId);
_serviceBus.DeferLocal(TimeSpan.FromSeconds(5), message, _configuration.GetSection("AppSettings")["Environment"]).Wait();
MarkAsUnchanged();
return Task.CompletedTask;
}
}
感谢任何帮助!
我不确定我是否正确理解了您所遇到的症状。
If I send two messages "at the same time" to the first handler, each one comes and even with properties that correlate that messages, the IsNew property not changes after the first message processed
如果调用 EnforceExclusiveAccess
,我希望以串行方式处理消息,第一个使用 IsNew == true
,第二个使用 IsNew == false
。
如果不是,我希望两条消息都与 IsNew == true
并行处理,但是然后 – 当插入 sage 数据时 – 我希望其中一条成功,另一条失败ConcurrencyException
.
在 ConcurrencyException
之后,将再次处理消息,这次是 IsNew == false
。
这不是你正在经历的吗?
In the second handler, I wish to access all data related to those Saga, but I can't because the data seems to be the data as it was in the revision of those messages was deferred.
您是说 saga 数据中的数据似乎处于 VerifyRouteListIsComplete
消息被延迟时的状态?
这听起来很奇怪,也不太可能你能不能再试一次,看看是否真的如此?
更新:我发现您遇到这种奇怪行为的原因:您不小心将 saga 处理程序实例设置为跨消息重复使用。
您是这样注册的(警告:不要这样做!):
_sagaHandler = new ShippingOrderSagaHandler(_subscriber);
_subscriber.Subscribe<ShippingOrderMessage>(_sagaHandler);
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(_sagaHandler);
然后 Subscribe
方法在 BuiltinHandlerActivator
上进行此调用(警告:不要这样做!):
activator.Register(() => handlerInstance);
这是不好的原因(尤其是对于 saga 处理程序),是因为处理程序实例本身是有状态的——它有一个 Data
属性 包含进程的当前状态,并且这还包括 IsNew
属性.
您应该始终做的是确保每次收到消息时都创建一个新的处理程序实例 – 您的代码应该更改为如下所示:
_subscriber.Subscribe<ShippingOrderMessage>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();
把Subscribe
的实现改成这样就可以了:
public async Task Subscribe<T>(Func<IHandleMessages<T>> getHandler)
{
_activator.Register((bus, context) => getHandler());
await _activator.Bus.Subscribe<T>();
}
这将解决您的独占访问问题:)
您的代码还有另一个问题:在注册您的处理程序和启动订阅者总线实例之间存在潜在的竞争条件,因为理论上您可能会不幸地在总线启动和您注册您的订阅者之间开始接收消息处理程序。
您应该更改代码以确保在启动总线(并因此开始接收消息)之前注册所有处理程序。