重建读取端数据库时事件乱序
Events Out of Order While Rebuilding Read Side DB
我有一个事件源系统,我现在正在为其实现一个端点,它将从事件存储事件重建读取端数据存储。但是,我现在 运行 陷入似乎与我处理事件的方式有关的并发问题。
我决定在重建期间使用我的事件处理程序代码来处理事件。在系统的正常状态下(不是读取端数据库重建),我的事件处理程序侦听它们订阅的事件并相应地更新预测。但是,当通过它们的事件处理程序在线处理这些事件时,我在读取端 DB 最终状态中看到不一致的结果(如果它甚至到达那里,有时却没有)。 我想这意味着它们执行顺序不对。
我不应该以这种方式使用事件处理程序吗?我想既然我正在处理事件,那么以这种方式重用事件处理程序是非常合适的。
我正在使用 MediatR 进行在线消息传递。所有事件处理程序都实现 INotificationHandler
.
这是代码示例:
IEnumerable<IEvent> events = await _eventRepo.GetAllAggregateEvents(aggId);
int eventNumber = 0;
foreach (var e in events)
{
if (e.Version != eventNumber + 1)
throw new EventsOutOfOrderException("Events were out of order while rebuilding DB");
var ev = e as Event;
// publish different historic events to event handlers which work with read DBs
switch (e.Type)
{
case EventType.WarehouseCreated:
WarehouseCreated w = new WarehouseCreated(ev);
await _mediator.Publish(w);
break;
case EventType.BoxCreated:
BoxCreated b = new BoxCreated(ev);
await _mediator.Publish(b);
break;
case EventType.BoxLocationChanged:
BoxLocationChanged l = new BoxLocationChanged(ev);
await _mediator.Publish(l);
break;
}
eventNumber++;
}
我已经尝试将 await
关键字替换为对 Wait()
的调用。
类似于 _mediator.Publish(bcc).Wait()
。
但这似乎不是一个好主意,因为这背后有异步代码。它也没有用..
我也尝试过对事件版本进行排队,并让事件类型案例等到它们的版本位于队列顶部后再发布事件。
类似于:
case EventType.BoxContentsChanged:
BoxContentsChanged bcc = new BoxContentsChanged(ev);
while (eventQueue.Peek() != bcc.Version)
continue;
await _mediator.Publish(bcc);
eventQueue.Dequeue();
break;
这也没有用。
无论如何 - 如果有人对如何处理这个问题有任何想法,我将不胜感激。我不想以同步方式复制所有异步事件处理程序代码。
我想最好的方法是同步执行,以确保一致性。这需要我在重播服务中复制一些事件处理程序逻辑并创建同步存储库方法。不过现在没有比赛条件,这很好。
我有一个事件源系统,我现在正在为其实现一个端点,它将从事件存储事件重建读取端数据存储。但是,我现在 运行 陷入似乎与我处理事件的方式有关的并发问题。
我决定在重建期间使用我的事件处理程序代码来处理事件。在系统的正常状态下(不是读取端数据库重建),我的事件处理程序侦听它们订阅的事件并相应地更新预测。但是,当通过它们的事件处理程序在线处理这些事件时,我在读取端 DB 最终状态中看到不一致的结果(如果它甚至到达那里,有时却没有)。 我想这意味着它们执行顺序不对。
我不应该以这种方式使用事件处理程序吗?我想既然我正在处理事件,那么以这种方式重用事件处理程序是非常合适的。
我正在使用 MediatR 进行在线消息传递。所有事件处理程序都实现 INotificationHandler
.
这是代码示例:
IEnumerable<IEvent> events = await _eventRepo.GetAllAggregateEvents(aggId);
int eventNumber = 0;
foreach (var e in events)
{
if (e.Version != eventNumber + 1)
throw new EventsOutOfOrderException("Events were out of order while rebuilding DB");
var ev = e as Event;
// publish different historic events to event handlers which work with read DBs
switch (e.Type)
{
case EventType.WarehouseCreated:
WarehouseCreated w = new WarehouseCreated(ev);
await _mediator.Publish(w);
break;
case EventType.BoxCreated:
BoxCreated b = new BoxCreated(ev);
await _mediator.Publish(b);
break;
case EventType.BoxLocationChanged:
BoxLocationChanged l = new BoxLocationChanged(ev);
await _mediator.Publish(l);
break;
}
eventNumber++;
}
我已经尝试将 await
关键字替换为对 Wait()
的调用。
类似于 _mediator.Publish(bcc).Wait()
。
但这似乎不是一个好主意,因为这背后有异步代码。它也没有用..
我也尝试过对事件版本进行排队,并让事件类型案例等到它们的版本位于队列顶部后再发布事件。
类似于:
case EventType.BoxContentsChanged:
BoxContentsChanged bcc = new BoxContentsChanged(ev);
while (eventQueue.Peek() != bcc.Version)
continue;
await _mediator.Publish(bcc);
eventQueue.Dequeue();
break;
这也没有用。
无论如何 - 如果有人对如何处理这个问题有任何想法,我将不胜感激。我不想以同步方式复制所有异步事件处理程序代码。
我想最好的方法是同步执行,以确保一致性。这需要我在重播服务中复制一些事件处理程序逻辑并创建同步存储库方法。不过现在没有比赛条件,这很好。