从 Lagom/Akka Kafka 主题订阅者为 Websocket 创建源
Create Source from Lagom/Akka Kafka Topic Subscriber for Websocket
我希望我的 Lagom 订阅者专用服务订阅 Kafka 主题并将消息流式传输到 websocket。我使用此文档 (https://www.lagomframework.com/documentation/1.4.x/scala/MessageBrokerApi.html#Subscribe-to-a-topic) 作为指南定义了如下服务:
// service call
def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]
// service implementation
override def stream() = ServiceCall { req =>
req.runForeach(str => log.info(s"client: %str"))
kafkaTopic().subscribe.atLeastOnce(Flow.fromFunction(
// add message to a Source and return Done
))
Future.successful(//some Source[String, NotUsed])
但是,我不太明白如何处理我的 kafka 消息。 Flow.fromFunction
returns [String, Done, _]
并暗示我需要将这些消息(字符串)添加到在订阅者外部创建的源中。
所以我的问题是双重的:
1) 如何创建 akka 流源以在运行时接收来自 kafka 主题订阅者的消息?
2) 如何在 Flow 中将 kafka 消息附加到所述源?
您似乎误解了 Lagom 的服务 API。如果您尝试从服务调用主体中实现流,则您的调用没有任何输入;即,
def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]
表示当客户端提供Source[String, NotUsed]
时,服务将以实物方式响应。您的客户没有直接提供这个;因此,您的签名应该是
def stream(): ServiceCall[NotUsed, Source[String, NotUsed]]
现在回答你的问题...
这实际上不存在于 scala giter8 模板中,但是 java 版本包含他们所谓的 autonomous stream ,它可以大致完成您想要做的事情。
在 Scala 中,这段代码看起来像...
override def autonomousStream(): ServiceCall[
Source[String, NotUsed],
Source[String, NotUsed]
] = ServiceCall { hellos => Future {
hellos.mapAsync(8, ...)
}
}
由于您的调用没有映射到 input 流,而是映射到 kafka 主题,因此您需要执行以下操作:
override def stream(): ServiceCall[NotUsed, Source[String, NotUsed]] = ServiceCall {
_ =>
Future {
kafkaTopic()
.subscribe
.atMostOnce
.mapAsync(...)
}
}
我希望我的 Lagom 订阅者专用服务订阅 Kafka 主题并将消息流式传输到 websocket。我使用此文档 (https://www.lagomframework.com/documentation/1.4.x/scala/MessageBrokerApi.html#Subscribe-to-a-topic) 作为指南定义了如下服务:
// service call
def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]
// service implementation
override def stream() = ServiceCall { req =>
req.runForeach(str => log.info(s"client: %str"))
kafkaTopic().subscribe.atLeastOnce(Flow.fromFunction(
// add message to a Source and return Done
))
Future.successful(//some Source[String, NotUsed])
但是,我不太明白如何处理我的 kafka 消息。 Flow.fromFunction
returns [String, Done, _]
并暗示我需要将这些消息(字符串)添加到在订阅者外部创建的源中。
所以我的问题是双重的: 1) 如何创建 akka 流源以在运行时接收来自 kafka 主题订阅者的消息? 2) 如何在 Flow 中将 kafka 消息附加到所述源?
您似乎误解了 Lagom 的服务 API。如果您尝试从服务调用主体中实现流,则您的调用没有任何输入;即,
def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]
表示当客户端提供Source[String, NotUsed]
时,服务将以实物方式响应。您的客户没有直接提供这个;因此,您的签名应该是
def stream(): ServiceCall[NotUsed, Source[String, NotUsed]]
现在回答你的问题...
这实际上不存在于 scala giter8 模板中,但是 java 版本包含他们所谓的 autonomous stream ,它可以大致完成您想要做的事情。
在 Scala 中,这段代码看起来像...
override def autonomousStream(): ServiceCall[
Source[String, NotUsed],
Source[String, NotUsed]
] = ServiceCall { hellos => Future {
hellos.mapAsync(8, ...)
}
}
由于您的调用没有映射到 input 流,而是映射到 kafka 主题,因此您需要执行以下操作:
override def stream(): ServiceCall[NotUsed, Source[String, NotUsed]] = ServiceCall {
_ =>
Future {
kafkaTopic()
.subscribe
.atMostOnce
.mapAsync(...)
}
}