kafka 在停止时如何处理发送给它的消息?

How does the kafka handle the messages sent to it while it is stopped?

我是 Kafka 的新手,我一直在研究 Kafka 在停止时向它发送消息时的行为。

我遇到的情况是我使用 'Kubectl delete StatefulSet kafka_kf' 停止了 Kafka。然后我使用 java Kafka Producer 向 Kafka 发送一些消息。然后我再次启动 Kafka,这些发送到 Kafka 的消息确实在我启动 Kafka 的那一刻立即出现在消费者中。 知道在这种情况下 Kafka 内部会发生什么吗?以及如何防止这些消息出现在消费者中? 这些消息稍后会导致重复问题,这就是为什么我需要它们不出现的原因。

我看到消息出现在使用命令打开的消费者中:

kubectl exec -ti test -- ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --isolation-level read_committed --topic testtopic

用于向kafka发送消息的代码是: producer.send(message)

首先,我认为理解 producer.send() 是一个异步调用很重要,所以它不会阻塞。其次,send() 方法实际上并没有将消息推送到代理,而是将消息放入本地内存中的二进制队列中。每个都有一个单独的二进制队列 划分生产者与之通信的主题。记录实际上是由生产者端的内部后台线程推送给代理的,该线程将由可配置的批处理阈值触发。此操作正在等待来自代理的确认(由 acks 设置配置),而不是 send() 方法。

[来源:Confluent Training - 构建 Apache Kafka 的开发人员技能]

当 Kafka 不可用时,您将在生产者中获得 TimeoutException。但是,这个异常可以通过重试来处理,生产者配置 retries 默认设置为 2147483647.

一旦您使 Kafka 可用,您的生产者就能够实际将消息发送到 Kafka,您的消费者将收到它们。

如果您不想接收这些消息,您需要设置 KafkaProducer 配置 retries=0

要了解有关生产者回调异常的更多信息,您可以查看我的另一个

编辑评论中的新问题:

Is there any way to find whether a message (or all the messages) was successfully sent or not?

您可以在发送数据时定义自定义回调 class,如下所示。如果消息的生成出现问题,此回调将抛出异常。

class ProducerCallback extends Callback {

  @Override
  override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
    if (e != null) {
      e.printStackTrace()
    }
  }

}

producer.send(message, new ProducerCallback)

作为替代方案,您可以简单地调用

producer.send(message).get()

因为这将阻塞,直到您收到来自 Kafka 代理的所有确认(请参阅 KafkaProducer 配置 acks)。