Alpakka Akka Stream 无法从 kafka 读取
Alpakka Akka Stream unable to read from kafka
我已经基于 alpakka 项目构建了一个非常简单的 akka 流,但它没有从 kafka 读取任何内容,即使它连接并创建了一个消费者组。我为流创建了隐式 Actor System 和 Materializer。
val done = Consumer.committableSource(consumerSettings,
Subscriptions.topics(kafkaTopic))
.map(msg => msg.committableOffset)
.mapAsync(1) { offset =>
offset.commitScaladsl()
}
.runWith(Sink.ignore)
- [stream.actor.dispatcher] 将此消息发送到 KafkaConsumerActor "Requesting messages, requestId: 1, partitions: Set(kafka-topic-0)"
- KafkaConsumerActor 似乎没有收到消息,但是当主管要求 Actor 关闭时,它确实收到消息并关闭。
关于为什么它无法无错误或异常地读取 Kafka 的任何线索?
我不明白为什么我的 akka 流不使用来自 kafka 代理的消息,但是当我实现与 Runnable Graph 相同的流时,它起作用了。
我使用的示例 - https://www.programcreek.com/scala/akka.stream.scaladsl.RunnableGraph
我已经基于 alpakka 项目构建了一个非常简单的 akka 流,但它没有从 kafka 读取任何内容,即使它连接并创建了一个消费者组。我为流创建了隐式 Actor System 和 Materializer。
val done = Consumer.committableSource(consumerSettings,
Subscriptions.topics(kafkaTopic))
.map(msg => msg.committableOffset)
.mapAsync(1) { offset =>
offset.commitScaladsl()
}
.runWith(Sink.ignore)
- [stream.actor.dispatcher] 将此消息发送到 KafkaConsumerActor "Requesting messages, requestId: 1, partitions: Set(kafka-topic-0)"
- KafkaConsumerActor 似乎没有收到消息,但是当主管要求 Actor 关闭时,它确实收到消息并关闭。
关于为什么它无法无错误或异常地读取 Kafka 的任何线索?
我不明白为什么我的 akka 流不使用来自 kafka 代理的消息,但是当我实现与 Runnable Graph 相同的流时,它起作用了。
我使用的示例 - https://www.programcreek.com/scala/akka.stream.scaladsl.RunnableGraph