在 Consumer API 中使用 createDrainingControl?
Usage of createDrainingControl in Consumer API?
我正在阅读 Alpakka 中 Kafka 的 Consumer API 文档。我遇到了这段代码。据我了解,偏移量是使用 msg.committableOffset() 提交的。那么为什么我们需要 .toMat() 和 mapMaterializedValue()。我不能将它附加到 Sink.Ignore() 吗?
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(
1,
msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset()))
.toMat(Committer.sink(committerSettings.withMaxBatch(1)), Keep.both())
.mapMaterializedValue(Consumer::createDrainingControl)
.run(materializer);
您无法附加到 Sink.ignore,因为您已经附加了 Commiter.Sink。
但是您可以丢弃物化值。
该示例使用带有 Keep.both 的 toMat 来保留两个物化值,源中的控件和接收器中的 Future[Done]。
使用这两个值,它会在 mapMaterializedValue 中创建一个 DrainingControl,允许您停止流或在停止前排空流,或在流停止时收到通知。
如果您不关心这个控件(尽管您应该关心),您可以使用
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(
1,
msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset()))
.to(Committer.sink(committerSettings.withMaxBatch(1)))
.run(materializer);
我正在阅读 Alpakka 中 Kafka 的 Consumer API 文档。我遇到了这段代码。据我了解,偏移量是使用 msg.committableOffset() 提交的。那么为什么我们需要 .toMat() 和 mapMaterializedValue()。我不能将它附加到 Sink.Ignore() 吗?
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(
1,
msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset()))
.toMat(Committer.sink(committerSettings.withMaxBatch(1)), Keep.both())
.mapMaterializedValue(Consumer::createDrainingControl)
.run(materializer);
您无法附加到 Sink.ignore,因为您已经附加了 Commiter.Sink。 但是您可以丢弃物化值。
该示例使用带有 Keep.both 的 toMat 来保留两个物化值,源中的控件和接收器中的 Future[Done]。 使用这两个值,它会在 mapMaterializedValue 中创建一个 DrainingControl,允许您停止流或在停止前排空流,或在流停止时收到通知。
如果您不关心这个控件(尽管您应该关心),您可以使用
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(
1,
msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset()))
.to(Committer.sink(committerSettings.withMaxBatch(1)))
.run(materializer);