kafka 在停止时如何处理发送给它的消息?
How does the kafka handle the messages sent to it while it is stopped?
我是 Kafka 的新手,我一直在研究 Kafka 在停止时向它发送消息时的行为。
我遇到的情况是我使用 'Kubectl delete StatefulSet kafka_kf' 停止了 Kafka。然后我使用 java Kafka Producer 向 Kafka 发送一些消息。然后我再次启动 Kafka,这些发送到 Kafka 的消息确实在我启动 Kafka 的那一刻立即出现在消费者中。 知道在这种情况下 Kafka 内部会发生什么吗?以及如何防止这些消息出现在消费者中? 这些消息稍后会导致重复问题,这就是为什么我需要它们不出现的原因。
我看到消息出现在使用命令打开的消费者中:
kubectl exec -ti test -- ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --isolation-level read_committed --topic testtopic
用于向kafka发送消息的代码是:
producer.send(message)
首先,我认为理解 producer.send()
是一个异步调用很重要,所以它不会阻塞。其次,send()
方法实际上并没有将消息推送到代理,而是将消息放入本地内存中的二进制队列中。每个都有一个单独的二进制队列
划分生产者与之通信的主题。记录实际上是由生产者端的内部后台线程推送给代理的,该线程将由可配置的批处理阈值触发。此操作正在等待来自代理的确认(由 acks 设置配置),而不是 send() 方法。
[来源:Confluent Training - 构建 Apache Kafka 的开发人员技能]
当 Kafka 不可用时,您将在生产者中获得 TimeoutException
。但是,这个异常可以通过重试来处理,生产者配置 retries
默认设置为 2147483647.
一旦您使 Kafka 可用,您的生产者就能够实际将消息发送到 Kafka,您的消费者将收到它们。
如果您不想接收这些消息,您需要设置 KafkaProducer 配置 retries=0
。
要了解有关生产者回调异常的更多信息,您可以查看我的另一个 。
编辑评论中的新问题:
Is there any way to find whether a message (or all the messages) was successfully sent or not?
您可以在发送数据时定义自定义回调 class,如下所示。如果消息的生成出现问题,此回调将抛出异常。
class ProducerCallback extends Callback {
@Override
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (e != null) {
e.printStackTrace()
}
}
}
producer.send(message, new ProducerCallback)
作为替代方案,您可以简单地调用
producer.send(message).get()
因为这将阻塞,直到您收到来自 Kafka 代理的所有确认(请参阅 KafkaProducer 配置 acks
)。
我是 Kafka 的新手,我一直在研究 Kafka 在停止时向它发送消息时的行为。
我遇到的情况是我使用 'Kubectl delete StatefulSet kafka_kf' 停止了 Kafka。然后我使用 java Kafka Producer 向 Kafka 发送一些消息。然后我再次启动 Kafka,这些发送到 Kafka 的消息确实在我启动 Kafka 的那一刻立即出现在消费者中。 知道在这种情况下 Kafka 内部会发生什么吗?以及如何防止这些消息出现在消费者中? 这些消息稍后会导致重复问题,这就是为什么我需要它们不出现的原因。
我看到消息出现在使用命令打开的消费者中:
kubectl exec -ti test -- ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --isolation-level read_committed --topic testtopic
用于向kafka发送消息的代码是:
producer.send(message)
首先,我认为理解 producer.send()
是一个异步调用很重要,所以它不会阻塞。其次,send()
方法实际上并没有将消息推送到代理,而是将消息放入本地内存中的二进制队列中。每个都有一个单独的二进制队列
划分生产者与之通信的主题。记录实际上是由生产者端的内部后台线程推送给代理的,该线程将由可配置的批处理阈值触发。此操作正在等待来自代理的确认(由 acks 设置配置),而不是 send() 方法。
[来源:Confluent Training - 构建 Apache Kafka 的开发人员技能]
当 Kafka 不可用时,您将在生产者中获得 TimeoutException
。但是,这个异常可以通过重试来处理,生产者配置 retries
默认设置为 2147483647.
一旦您使 Kafka 可用,您的生产者就能够实际将消息发送到 Kafka,您的消费者将收到它们。
如果您不想接收这些消息,您需要设置 KafkaProducer 配置 retries=0
。
要了解有关生产者回调异常的更多信息,您可以查看我的另一个
编辑评论中的新问题:
Is there any way to find whether a message (or all the messages) was successfully sent or not?
您可以在发送数据时定义自定义回调 class,如下所示。如果消息的生成出现问题,此回调将抛出异常。
class ProducerCallback extends Callback {
@Override
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (e != null) {
e.printStackTrace()
}
}
}
producer.send(message, new ProducerCallback)
作为替代方案,您可以简单地调用
producer.send(message).get()
因为这将阻塞,直到您收到来自 Kafka 代理的所有确认(请参阅 KafkaProducer 配置 acks
)。