Consumer committableSource 和 plainSource 有什么区别?
What are the differences between Consumer committableSource and plainSource?
我正在尝试使用消费者库 https://doc.akka.io/docs/alpakka-kafka/current/consumer.html 方法 committableSource
,如下所示:
Consumer
.committableSource(consumerSettings, Subscriptions.topics("SAP-EVENT-BUS"))
.map(_.committableOffset)
.toMat(Committer.sink(committerSettings))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
这里的问题是,如何获取消费者从Kafka
收到的消息?
使用以下代码片段有效:
Consumer
.plainSource(
consumerSettings,
Subscriptions.topics("SAP-EVENT-BUS"))
.to(Sink.foreach(println))
.run()
整个代码片段:
private implicit val materializer = ActorMaterializer()
private val config = context.system.settings.config.getConfig("akka.kafka.consumer")
private val consumerSettings =
ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("SAP-SENDER-GROUP")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
private val committerSettings = CommitterSettings(context.system)
Consumer
.committableSource(consumerSettings, Subscriptions.topics("TOPIC"))
.map(_.committableOffset)
.toMat(Committer.sink(committerSettings))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
Consumer
.plainSource(
consumerSettings,
Subscriptions.topics("SAP-EVENT-BUS"))
.to(Sink.foreach(println))
.run()
或者我必须同时使用两者,一个用于提交,另一个用于消费。
不要使用终止流的 Committer.sink
,而是使用 Committer.flow
,它允许您继续流,直到您选择使用不同的接收器终止它。
我正在尝试使用消费者库 https://doc.akka.io/docs/alpakka-kafka/current/consumer.html 方法 committableSource
,如下所示:
Consumer
.committableSource(consumerSettings, Subscriptions.topics("SAP-EVENT-BUS"))
.map(_.committableOffset)
.toMat(Committer.sink(committerSettings))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
这里的问题是,如何获取消费者从Kafka
收到的消息?
使用以下代码片段有效:
Consumer
.plainSource(
consumerSettings,
Subscriptions.topics("SAP-EVENT-BUS"))
.to(Sink.foreach(println))
.run()
整个代码片段:
private implicit val materializer = ActorMaterializer()
private val config = context.system.settings.config.getConfig("akka.kafka.consumer")
private val consumerSettings =
ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("SAP-SENDER-GROUP")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
private val committerSettings = CommitterSettings(context.system)
Consumer
.committableSource(consumerSettings, Subscriptions.topics("TOPIC"))
.map(_.committableOffset)
.toMat(Committer.sink(committerSettings))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
Consumer
.plainSource(
consumerSettings,
Subscriptions.topics("SAP-EVENT-BUS"))
.to(Sink.foreach(println))
.run()
或者我必须同时使用两者,一个用于提交,另一个用于消费。
不要使用终止流的 Committer.sink
,而是使用 Committer.flow
,它允许您继续流,直到您选择使用不同的接收器终止它。