如何安全地取消订阅 Kafka 中的主题
How to safely unsubscribe to a topic in Kafka
我有一个简单的 java 程序(dockerized)并部署在 kubernetes (pod) 中
这个 java 程序只是一个正常的 java 项目,它听取和消费特定主题。例如。样本安全主题
我必须安全地取消订阅这个主题,这意味着即使我删除了这个 pod(java 消费者)也不会丢失任何数据。
这是我搜索看到的代码:
public static void unsubscribeSafelyFromKafka() {
logger.debug("Safely unsubscribe to topic..");
if (myKakfaConsumer != null) {
myKafkaConsumer.unsubscribe();
myKafkaConsumer.close();
}
}
我需要通过命令行 运行 执行此操作,其中 Java 程序已经有一个现有的静态 main 方法。
我的问题是:
- 上面的代码能保证没有记录丢失吗?
- 当已经存在静态 main() 时,如何通过命令行触发上面的代码
注意:我正在通过命令行 运行 宁 java 项目。例如。 java -jar MyKafkaConsumer.jar 因为这是要求。
请帮忙
如果我对问题 1 的理解是正确的,您担心在通过控制台命令触发的一个线程取消订阅后,轮询消费者正在处理一批记录的风险,如果 pod 被杀死,这些记录可能会丢失?
如果您有其他 pods 作为同一个消费者组的一部分消费,或者如果这个或任何 pod 使用相同的组 ID 再次订阅,那么最后提交的偏移量将确保没有记录丢失(虽然有些可能被处理不止一次)因为这是接管的消费者将从那里开始。
如果您使用最安全的自动提交,因为每次提交都发生在后续轮询中,因此您不可能提交尚未处理的记录(只要您不产生额外的线程来进行处理) .手动提交由您决定何时处理记录以及何时可以安全提交。
但是,在取消订阅后调用关闭是一个好主意,并且应该确保干净地完成当前轮询批处理并提交最终偏移量,只要所有这些都发生在超时期限内。
关于问题 2,如果您需要手动取消订阅,那么我认为您需要 JMX 或公开一个 API 或类似的方法来调用 运行 JVM 上的方法。但是,如果您只是想确保在 pod 终止时安全关闭,您可以在关闭挂钩中取消订阅,或者不用担心,因为偏移量提交提供了安全性。
我有一个简单的 java 程序(dockerized)并部署在 kubernetes (pod) 中 这个 java 程序只是一个正常的 java 项目,它听取和消费特定主题。例如。样本安全主题
我必须安全地取消订阅这个主题,这意味着即使我删除了这个 pod(java 消费者)也不会丢失任何数据。
这是我搜索看到的代码:
public static void unsubscribeSafelyFromKafka() {
logger.debug("Safely unsubscribe to topic..");
if (myKakfaConsumer != null) {
myKafkaConsumer.unsubscribe();
myKafkaConsumer.close();
}
}
我需要通过命令行 运行 执行此操作,其中 Java 程序已经有一个现有的静态 main 方法。
我的问题是:
- 上面的代码能保证没有记录丢失吗?
- 当已经存在静态 main() 时,如何通过命令行触发上面的代码
注意:我正在通过命令行 运行 宁 java 项目。例如。 java -jar MyKafkaConsumer.jar 因为这是要求。
请帮忙
如果我对问题 1 的理解是正确的,您担心在通过控制台命令触发的一个线程取消订阅后,轮询消费者正在处理一批记录的风险,如果 pod 被杀死,这些记录可能会丢失?
如果您有其他 pods 作为同一个消费者组的一部分消费,或者如果这个或任何 pod 使用相同的组 ID 再次订阅,那么最后提交的偏移量将确保没有记录丢失(虽然有些可能被处理不止一次)因为这是接管的消费者将从那里开始。
如果您使用最安全的自动提交,因为每次提交都发生在后续轮询中,因此您不可能提交尚未处理的记录(只要您不产生额外的线程来进行处理) .手动提交由您决定何时处理记录以及何时可以安全提交。
但是,在取消订阅后调用关闭是一个好主意,并且应该确保干净地完成当前轮询批处理并提交最终偏移量,只要所有这些都发生在超时期限内。
关于问题 2,如果您需要手动取消订阅,那么我认为您需要 JMX 或公开一个 API 或类似的方法来调用 运行 JVM 上的方法。但是,如果您只是想确保在 pod 终止时安全关闭,您可以在关闭挂钩中取消订阅,或者不用担心,因为偏移量提交提供了安全性。