Lagom 中 Message Broker 的完整示例

Full example of Message Broker in Lagom

我正在尝试实施使用 Lagom 1.2.2 设置的 Message Broker 并将 运行 嵌入墙中。该文档具有以下服务描述符示例:

default Descriptor descriptor() {
return named("helloservice").withCalls(...)
  // here we declare the topic(s) this service will publish to
  .publishing(
    topic("greetings", this::greetingsTopic)
  )
  ....;
}

此实施示例:

public Topic<GreetingMessage> greetingsTopic() {
return TopicProducer.singleStreamWithOffset(offset -> {
    return persistentEntityRegistry
        .eventStream(HelloEventTag.INSTANCE, offset)
        .map(this::convertEvent);
  });
}

但是,convertEvent() 函数的参数类型或 return 类型没有示例,这就是我画空白的地方。在另一端,MessageBroker 的订阅者,它似乎正在消耗 GreetingMessage 个对象,但是当我创建一个函数 convertEvent 到 return GreetingMessage 个对象时,我得到一个编译错误:

Error:(61, 21) java: method map in class akka.stream.javadsl.Source<Out,Mat> cannot be applied to given types;
  required: akka.japi.function.Function<akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset>,T>
  found: this::convertEvent
  reason: cannot infer type-variable(s) T
    (argument mismatch; invalid method reference
  incompatible types: akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset> cannot be converted to com.example.GreetingMessage)

是否有更详尽的示例来说明如何使用它?我已经查看了 Chirper 示例应用程序,它似乎没有这样的示例。

谢谢!

您粘贴的错误消息准确地告诉了您 map 的预期:

required: akka.japi.function.Function<akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset>,T>

因此,您需要传递一个接受 Pair<GreetingEvent, Offset> 的函数。函数return应该是什么?好吧,更新它以接受它,然后你会得到下一个错误,它会再次告诉你它期望你做什么 return,在这种情况下你会发现它是 Pair<GreetingMessage, Offset> .

解释一下这些类型是什么——Lagom 需要跟踪哪些事件已发布到 Kafka,这样当您重新启动服务时,它不会从您的事件日志的开头重新发布所有事件时间再次开始。它通过使用偏移来做到这一点。所以事件日志会产生成对的事件和偏移量,然后你需要将这些事件转换为将发布到 Kafka 的消息,当你 returned 转换后的消息到 Lagom 时,它需要是一个 in与您从事件日志中获得的偏移量配对,以便在发布到 Kafka 后,Lagom 可以保留偏移量,并在下次重新启动服务时将其用作起点。

完整的例子可以在这里看到:https://github.com/lagom/online-auction-java/blob/a32e696/bidding-impl/src/main/java/com/example/auction/bidding/impl/BiddingServiceImpl.java#L91