Lagom 中的事件标签是如何工作的?

How does event tagging work in Lagom?

Lagom reference documentation 显示如何标记事件:

object BlogEvent {
  val BlogEventTag = AggregateEventTag[BlogEvent]
}

sealed trait BlogEvent extends AggregateEvent[BlogEvent] {
  override def aggregateTag: AggregateEventTag[BlogEvent] =
    BlogEvent.BlogEventTag
}

sealed trait 建议可以标记父事件以使所有子事件按顺序处理:

All events with a particular tag can be consumed as a sequential, ordered stream of events.

所以我们这样做了,我们标记了我们的父事件,我们使用 slick 实现了一个 ReadSideProcessor 但它没有用。增加日志记录级别,我们看到 "unhandled message" 并且我们在 SlickReadSideImpl 中发现以下内容:

override def handle(): Flow[EventStreamElement[Event], Done, NotUsed] =
      Flow[EventStreamElement[Event]]
        .mapAsync(parallelism = 1) { element =>

          val dbAction = eventHandlers.get(element.event.getClass)
            .map { handler =>
              // apply handler if found
              handler(element)
            }
            .getOrElse {
              // fallback to empty action if no handler is found
              if (log.isDebugEnabled) log.debug("Unhandled event [{}]", element.event.getClass.getName)
              DBIO.successful(())
            }
            .flatMap { _ =>
              // whatever it happens we save the offset
              offsetDao.updateOffsetQuery(element.offset)
            }
            .map(_ => Done)

          slick.db.run(dbAction.transactionally)
        }

如果 class 与注册处理程序的 class 不完全匹配,则上面的 eventHandlers.get(element.event.getClass) 无法找到任何处理程序,例如它是一个子 class(我们的案例)。

这有点令人困惑:这是期望的行为还是 JDBCReadSideImplSlickReadSideImpl 实现中的错误?

这是按预期工作的。预计您将希望以不同的方式处理具体事件类型,因此您将为每种具体类型注册一个处理程序。

用相同标签标记所有事件子类型的目的是确保同一实体上不同类型事件之间的顺序。例如,如果你有一个 BlogCreatedEvent 和一个 BlogPublishedEvent 那么你会希望确保你的处理器在发布的事件之前接收到创建的事件,即使它以不同的方式处理它们。