Akka 流过滤后的 kafka 提交偏移量
Akka streams kafka commit offset after filter
我正在尝试为 akka 流中的偏移量设置一个至少一次提交策略,但我无法理解在我的流上使用过滤器的情况下预期的模式是什么。
我的预期是 none 过滤后的消息将得到它们的偏移量,因此它们最终将进入无限循环处理。
一个说明这一点的荒谬例子是像这样过滤所有消息:
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.filter(_ => false)
.mapAsync(3)(_.committableOffset.commitScaladsl())
.runWith(Sink.ignore)
我只能看到一个解决方案,将我的过滤器包装在流程中,检查逻辑是否会过滤掉并在这种情况下提交,但这似乎并不优雅,并且削弱了过滤器形状的价值。
过滤并不是一件罕见的事情,但我看不出有什么优雅的方式来提交偏移量?对我来说,框架无法做到这一点似乎很奇怪,所以我错过了什么?
我无法找到当前 akka 实现的解决方案来更智能地提交索引,因此我将责任委派给 kafka 级别的 kafka 设置 auto-commit 并将其与应用程序的正常关闭策略,因此当 blue/green 部署发生时,所有消息都在应用程序关闭之前处理。
- 自动提交为真:
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
- 正常关机:
val actorMaterializer = ActorMaterializer(
ActorMaterializerSettings(system)
scala.sys.addShutdownHook {
actorMaterializer.system.terminate()
Await.result(actorMaterializer.system.whenTerminated, 30.seconds)
}
我正在尝试为 akka 流中的偏移量设置一个至少一次提交策略,但我无法理解在我的流上使用过滤器的情况下预期的模式是什么。
我的预期是 none 过滤后的消息将得到它们的偏移量,因此它们最终将进入无限循环处理。
一个说明这一点的荒谬例子是像这样过滤所有消息:
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.filter(_ => false)
.mapAsync(3)(_.committableOffset.commitScaladsl())
.runWith(Sink.ignore)
我只能看到一个解决方案,将我的过滤器包装在流程中,检查逻辑是否会过滤掉并在这种情况下提交,但这似乎并不优雅,并且削弱了过滤器形状的价值。
过滤并不是一件罕见的事情,但我看不出有什么优雅的方式来提交偏移量?对我来说,框架无法做到这一点似乎很奇怪,所以我错过了什么?
我无法找到当前 akka 实现的解决方案来更智能地提交索引,因此我将责任委派给 kafka 级别的 kafka 设置 auto-commit 并将其与应用程序的正常关闭策略,因此当 blue/green 部署发生时,所有消息都在应用程序关闭之前处理。
- 自动提交为真:
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
- 正常关机:
val actorMaterializer = ActorMaterializer(
ActorMaterializerSettings(system)
scala.sys.addShutdownHook {
actorMaterializer.system.terminate()
Await.result(actorMaterializer.system.whenTerminated, 30.seconds)
}