为什么启用自动提交的 Kafka 客户端会在消费者关闭期间提交最新生成的消息的偏移量,即使消息尚未被消费?
Why does auto-commit enabled Kafka client commit latest produced message's offset during consumer close even if the message was not consumed yet?
TLDR:
- 对于启用自动提交的 Kafka 客户端,提交生成的消息的偏移量是否已消耗(即使不是)预期行为? (对于消费和生产相同主题的应用程序)
详细解释:
我有一个简单的 scala 应用程序,它有一个 Akka actor,它使用来自 Kafka 主题的消息并在消息处理期间发生任何异常时向同一主题生成消息。
override protected def processMessage(messages: Seq[ConsumerRecord[String, String]]): Future[Done] = {
Future.sequence(messages.map(message => {
logger.info(s"--CONSUMED: offset: ${message.offset()} message: ${message.value()}")
// in actual implementation, some process is done here and if an exception occurs, the message is sent to the same topic as seen below
sendToExceptionTopic(Instant.now().toEpochMilli)
Thread.sleep(1000)
Future(Done)
})).transformWith(_ => Future(Done))
}
这个 actor 每分钟启动一次,运行 20 秒然后停止。
def init(): Unit = {
exceptionManagerActor ! InitExceptionActors
system.scheduler.schedule(2.second, 60.seconds) {
logger.info("started consuming messages")
exceptionManagerActor ! ConsumeExceptions
}
}
private def startScheduledActor(actorRef: ActorRef): Unit = {
actorRef ! Start
context.system.scheduler.scheduleOnce(20.seconds) {
logger.info("stopping consuming messages")
actorRef ! Stop
}
}
override def receive: Receive = {
case Start =>
consumerBase = consumer
.groupedWithin(20, 2000.millisecond)
.mapAsyncUnordered(10)(processMessage)
.toMat(Sink.seq)(DrainingControl.apply)
.run()
case Stop =>
consumerBase.drainAndShutdown().transformWith {
case Success(value) =>
logger.info("actor stopped")
Future(value)
case Failure(ex) =>
logger.error("error: ", ex)
Future.failed(ex)
}
//Await.result(consumerBase.drainAndShutdown(), 1.minute)
}
使用此配置,Kafka 客户端在停止时提交最新生成的消息的偏移量,就好像它已被消费一样。
示例日志:
14:28:48.868 INFO - started consuming messages
14:28:50.945 INFO - --CONSUMED: offset: 97 message: 1
14:28:51.028 INFO - ----PRODUCED: offset: 98 message: 1643542130945
...
14:29:08.886 INFO - stopping consuming messages
14:29:08.891 INFO - --CONSUMED: offset: 106 message: 1643542147106
14:29:08.895 INFO - ----PRODUCED: offset: 107 message: 1643542148891 <------ this message was lost
14:29:39.946 INFO - actor stopped
14:29:39.956 INFO - Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://test-consumer/system/Materializers/StreamSupervisor-2/$$a#1541548736] to Actor[akka://test-consumer/system/kafka-consumer-1#914599016] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://test-consumer/system/kafka-consumer-1#914599016] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
14:29:48.866 INFO - started consuming messages <----- The message with offset 107 was expected in this cycle to consume but it was not consumed
14:30:08.871 INFO - stopping consuming messages
14:30:38.896 INFO - actor stopped
从日志中可以看出,产生了一条偏移量为107的消息,但在下一个周期没有被消费。
其实我不是Akka actor方面的专家,也不知道这种情况是来自Kafka还是Akka,不过好像和auto-commit有关
使用的依赖版本:
lazy val versions = new {
val akka = "2.6.13"
val akkaHttp = "10.1.9"
val alpAkka = "2.0.7"
val logback = "1.2.3"
val apacheCommons = "1.7"
val json4s = "3.6.7"
}
libraryDependencies ++= {
Seq(
"com.typesafe.akka" %% "akka-slf4j" % versions.akka,
"com.typesafe.akka" %% "akka-stream-kafka" % versions.alpAkka,
"com.typesafe.akka" %% "akka-http" % versions.akkaHttp,
"com.typesafe.akka" %% "akka-protobuf" % versions.akka,
"com.typesafe.akka" %% "akka-stream" % versions.akka,
"ch.qos.logback" % "logback-classic" % versions.logback,
"org.json4s" %% "json4s-jackson" % versions.json4s,
"org.apache.commons" % "commons-text" % versions.apacheCommons,
)
}
可以从 this repository
获得示例源代码和重现该情况的步骤
就Kafka而言,只要Alpakka Kafka从Kafka读取消息就会被消费。
这是在 Alpakka Kafka 内部的 actor 将其发送给下游消费者以进行应用程序级处理之前。
Kafka auto-commit (enable.auto.commit = true
) 因此会导致在消息发送给您的 actor 之前提交偏移量。
关于偏移量管理的 Kafka 文档确实(在撰写本文时)将 enable.auto.commit
称为具有 at-least-once 语义,但正如我在第一段中指出的那样,这是一个 at-least-once 交付语义,而不是at-least-once处理语义。后者是一个应用程序级别的问题,并且需要延迟提交偏移量直到处理完成。
Alpakka Kafka 文档有 an involved discussion about at-least-once processing:在这种情况下,at-least-once 处理可能需要引入手动偏移提交并将 mapAsyncUnordered
替换为 mapAsync
(因为 mapAsyncUnordered
与手动偏移量提交相结合意味着您的应用程序只能保证来自 Kafka 的消息被处理 at-least-zero 次)。
在 Alpakka Kafka 中,消息处理的广泛分类保证:
- 困难at-most-once:
Consumer.atMostOnceSource
- 在处理之前每条消息后提交
- 软 at-most-once:
enable.auto.commit = true
- “软”因为提交实际上是批处理以增加吞吐量,所以这真的是“at-most-once,除非它是 at-least-once “
- 困难at-least-once:只有在所有处理都验证成功后才手动提交
- soft at-least-once:完成某些处理后手动提交(即“at-least-once,除非是 at-most-once”)
- exactly-once:通常不可能,但如果您的处理有去重的方法,从而使重复项幂等,您可以 effectively-once
TLDR:
- 对于启用自动提交的 Kafka 客户端,提交生成的消息的偏移量是否已消耗(即使不是)预期行为? (对于消费和生产相同主题的应用程序)
详细解释:
我有一个简单的 scala 应用程序,它有一个 Akka actor,它使用来自 Kafka 主题的消息并在消息处理期间发生任何异常时向同一主题生成消息。
override protected def processMessage(messages: Seq[ConsumerRecord[String, String]]): Future[Done] = {
Future.sequence(messages.map(message => {
logger.info(s"--CONSUMED: offset: ${message.offset()} message: ${message.value()}")
// in actual implementation, some process is done here and if an exception occurs, the message is sent to the same topic as seen below
sendToExceptionTopic(Instant.now().toEpochMilli)
Thread.sleep(1000)
Future(Done)
})).transformWith(_ => Future(Done))
}
这个 actor 每分钟启动一次,运行 20 秒然后停止。
def init(): Unit = {
exceptionManagerActor ! InitExceptionActors
system.scheduler.schedule(2.second, 60.seconds) {
logger.info("started consuming messages")
exceptionManagerActor ! ConsumeExceptions
}
}
private def startScheduledActor(actorRef: ActorRef): Unit = {
actorRef ! Start
context.system.scheduler.scheduleOnce(20.seconds) {
logger.info("stopping consuming messages")
actorRef ! Stop
}
}
override def receive: Receive = {
case Start =>
consumerBase = consumer
.groupedWithin(20, 2000.millisecond)
.mapAsyncUnordered(10)(processMessage)
.toMat(Sink.seq)(DrainingControl.apply)
.run()
case Stop =>
consumerBase.drainAndShutdown().transformWith {
case Success(value) =>
logger.info("actor stopped")
Future(value)
case Failure(ex) =>
logger.error("error: ", ex)
Future.failed(ex)
}
//Await.result(consumerBase.drainAndShutdown(), 1.minute)
}
使用此配置,Kafka 客户端在停止时提交最新生成的消息的偏移量,就好像它已被消费一样。
示例日志:
14:28:48.868 INFO - started consuming messages
14:28:50.945 INFO - --CONSUMED: offset: 97 message: 1
14:28:51.028 INFO - ----PRODUCED: offset: 98 message: 1643542130945
...
14:29:08.886 INFO - stopping consuming messages
14:29:08.891 INFO - --CONSUMED: offset: 106 message: 1643542147106
14:29:08.895 INFO - ----PRODUCED: offset: 107 message: 1643542148891 <------ this message was lost
14:29:39.946 INFO - actor stopped
14:29:39.956 INFO - Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://test-consumer/system/Materializers/StreamSupervisor-2/$$a#1541548736] to Actor[akka://test-consumer/system/kafka-consumer-1#914599016] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://test-consumer/system/kafka-consumer-1#914599016] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
14:29:48.866 INFO - started consuming messages <----- The message with offset 107 was expected in this cycle to consume but it was not consumed
14:30:08.871 INFO - stopping consuming messages
14:30:38.896 INFO - actor stopped
从日志中可以看出,产生了一条偏移量为107的消息,但在下一个周期没有被消费。
其实我不是Akka actor方面的专家,也不知道这种情况是来自Kafka还是Akka,不过好像和auto-commit有关
使用的依赖版本:
lazy val versions = new {
val akka = "2.6.13"
val akkaHttp = "10.1.9"
val alpAkka = "2.0.7"
val logback = "1.2.3"
val apacheCommons = "1.7"
val json4s = "3.6.7"
}
libraryDependencies ++= {
Seq(
"com.typesafe.akka" %% "akka-slf4j" % versions.akka,
"com.typesafe.akka" %% "akka-stream-kafka" % versions.alpAkka,
"com.typesafe.akka" %% "akka-http" % versions.akkaHttp,
"com.typesafe.akka" %% "akka-protobuf" % versions.akka,
"com.typesafe.akka" %% "akka-stream" % versions.akka,
"ch.qos.logback" % "logback-classic" % versions.logback,
"org.json4s" %% "json4s-jackson" % versions.json4s,
"org.apache.commons" % "commons-text" % versions.apacheCommons,
)
}
可以从 this repository
获得示例源代码和重现该情况的步骤就Kafka而言,只要Alpakka Kafka从Kafka读取消息就会被消费。
这是在 Alpakka Kafka 内部的 actor 将其发送给下游消费者以进行应用程序级处理之前。
Kafka auto-commit (enable.auto.commit = true
) 因此会导致在消息发送给您的 actor 之前提交偏移量。
关于偏移量管理的 Kafka 文档确实(在撰写本文时)将 enable.auto.commit
称为具有 at-least-once 语义,但正如我在第一段中指出的那样,这是一个 at-least-once 交付语义,而不是at-least-once处理语义。后者是一个应用程序级别的问题,并且需要延迟提交偏移量直到处理完成。
Alpakka Kafka 文档有 an involved discussion about at-least-once processing:在这种情况下,at-least-once 处理可能需要引入手动偏移提交并将 mapAsyncUnordered
替换为 mapAsync
(因为 mapAsyncUnordered
与手动偏移量提交相结合意味着您的应用程序只能保证来自 Kafka 的消息被处理 at-least-zero 次)。
在 Alpakka Kafka 中,消息处理的广泛分类保证:
- 困难at-most-once:
Consumer.atMostOnceSource
- 在处理之前每条消息后提交 - 软 at-most-once:
enable.auto.commit = true
- “软”因为提交实际上是批处理以增加吞吐量,所以这真的是“at-most-once,除非它是 at-least-once “ - 困难at-least-once:只有在所有处理都验证成功后才手动提交
- soft at-least-once:完成某些处理后手动提交(即“at-least-once,除非是 at-most-once”)
- exactly-once:通常不可能,但如果您的处理有去重的方法,从而使重复项幂等,您可以 effectively-once