Lagom 中的事件标签是如何工作的?
How does event tagging work in Lagom?
Lagom reference documentation 显示如何标记事件:
object BlogEvent {
val BlogEventTag = AggregateEventTag[BlogEvent]
}
sealed trait BlogEvent extends AggregateEvent[BlogEvent] {
override def aggregateTag: AggregateEventTag[BlogEvent] =
BlogEvent.BlogEventTag
}
sealed trait 建议可以标记父事件以使所有子事件按顺序处理:
All events with a particular tag can be consumed as a sequential,
ordered stream of events.
所以我们这样做了,我们标记了我们的父事件,我们使用 slick 实现了一个 ReadSideProcessor 但它没有用。增加日志记录级别,我们看到 "unhandled message" 并且我们在 SlickReadSideImpl
中发现以下内容:
override def handle(): Flow[EventStreamElement[Event], Done, NotUsed] =
Flow[EventStreamElement[Event]]
.mapAsync(parallelism = 1) { element =>
val dbAction = eventHandlers.get(element.event.getClass)
.map { handler =>
// apply handler if found
handler(element)
}
.getOrElse {
// fallback to empty action if no handler is found
if (log.isDebugEnabled) log.debug("Unhandled event [{}]", element.event.getClass.getName)
DBIO.successful(())
}
.flatMap { _ =>
// whatever it happens we save the offset
offsetDao.updateOffsetQuery(element.offset)
}
.map(_ => Done)
slick.db.run(dbAction.transactionally)
}
如果 class 与注册处理程序的 class 不完全匹配,则上面的 eventHandlers.get(element.event.getClass)
无法找到任何处理程序,例如它是一个子 class(我们的案例)。
这有点令人困惑:这是期望的行为还是 JDBCReadSideImpl
和 SlickReadSideImpl
实现中的错误?
- 如果这是所需的行为,则不应标记密封特征事件(并且可能需要在文档中更新)
- 如果是所需的行为,则 JDBCReadSideImpl 和 SlickReadSideImpl 不能使用从 classname 到处理程序的映射。
这是按预期工作的。预计您将希望以不同的方式处理具体事件类型,因此您将为每种具体类型注册一个处理程序。
用相同标签标记所有事件子类型的目的是确保同一实体上不同类型事件之间的顺序。例如,如果你有一个 BlogCreatedEvent
和一个 BlogPublishedEvent
那么你会希望确保你的处理器在发布的事件之前接收到创建的事件,即使它以不同的方式处理它们。
Lagom reference documentation 显示如何标记事件:
object BlogEvent {
val BlogEventTag = AggregateEventTag[BlogEvent]
}
sealed trait BlogEvent extends AggregateEvent[BlogEvent] {
override def aggregateTag: AggregateEventTag[BlogEvent] =
BlogEvent.BlogEventTag
}
sealed trait 建议可以标记父事件以使所有子事件按顺序处理:
All events with a particular tag can be consumed as a sequential, ordered stream of events.
所以我们这样做了,我们标记了我们的父事件,我们使用 slick 实现了一个 ReadSideProcessor 但它没有用。增加日志记录级别,我们看到 "unhandled message" 并且我们在 SlickReadSideImpl
中发现以下内容:
override def handle(): Flow[EventStreamElement[Event], Done, NotUsed] =
Flow[EventStreamElement[Event]]
.mapAsync(parallelism = 1) { element =>
val dbAction = eventHandlers.get(element.event.getClass)
.map { handler =>
// apply handler if found
handler(element)
}
.getOrElse {
// fallback to empty action if no handler is found
if (log.isDebugEnabled) log.debug("Unhandled event [{}]", element.event.getClass.getName)
DBIO.successful(())
}
.flatMap { _ =>
// whatever it happens we save the offset
offsetDao.updateOffsetQuery(element.offset)
}
.map(_ => Done)
slick.db.run(dbAction.transactionally)
}
如果 class 与注册处理程序的 class 不完全匹配,则上面的 eventHandlers.get(element.event.getClass)
无法找到任何处理程序,例如它是一个子 class(我们的案例)。
这有点令人困惑:这是期望的行为还是 JDBCReadSideImpl
和 SlickReadSideImpl
实现中的错误?
- 如果这是所需的行为,则不应标记密封特征事件(并且可能需要在文档中更新)
- 如果是所需的行为,则 JDBCReadSideImpl 和 SlickReadSideImpl 不能使用从 classname 到处理程序的映射。
这是按预期工作的。预计您将希望以不同的方式处理具体事件类型,因此您将为每种具体类型注册一个处理程序。
用相同标签标记所有事件子类型的目的是确保同一实体上不同类型事件之间的顺序。例如,如果你有一个 BlogCreatedEvent
和一个 BlogPublishedEvent
那么你会希望确保你的处理器在发布的事件之前接收到创建的事件,即使它以不同的方式处理它们。