如何在 Kubernetes multipod 部署中使用 spring kafka 处理 Kafka 容器生命周期
How to handle Kafka container lifecycle using spring kafka in Kubernetes multipod deployment
我正在使用 Spring kafka 实现,我需要通过 REST API 启动和停止我的 kafka 消费者。为此,我正在使用 KafkaListenerEndpointRegistry endpointRegistry
endpointRegistry.getListenerContainer("consumer1").stop();
endpointRegistry.getListenerContainer("consumer1").start();
我们正在 kubernetes pod 上部署微服务,因此同一个微服务可能有多个部署。我如何设法启动和停止所有容器上的消费者。
Kubernetes 没有提供自动向所有 pods 广播 http 请求的服务;所以你必须自己做。
通过 Kafka 广播
您可以从接收到主题的 http 请求的单个实例发布 start/stop 命令,专门用于在所有实例之间广播命令。
当然,您必须确保每个实例都可以读取该主题的所有消息,因此您需要防止分区在这些实例之间被平衡。您可以通过在该主题的消费者上设置唯一的组 ID(例如,通过在您的普通组 ID 后缀 UUID)来实现这一点。
通过 Http 广播
Kubernetes 知道哪些 pods 正在监听哪些端点,您可以在您的服务中获取该信息。 Spring Cloud Kubernetes (https://cloud.spring.io/spring-cloud-static/spring-cloud-kubernetes/2.0.0.M1/reference/html/#ribbon-discovery-in-kubernetes) 使获取该信息变得容易;可能有很多不同的方法可以做到这一点,使用 Spring Cloud Kubernetes 它会像这样:
在随机选择的 pod 上接收命令,从 Ribbon 获取服务器列表(它包含所有实例和可以到达它们的 ip-address/port ),并发送一个新的 http-request 到他们每个人。
我更喜欢 Kafka 方法,因为它的健壮性,如果您已经在使用 Spring 云,http 方法可能更容易实施。
我正在使用 Spring kafka 实现,我需要通过 REST API 启动和停止我的 kafka 消费者。为此,我正在使用 KafkaListenerEndpointRegistry endpointRegistry
endpointRegistry.getListenerContainer("consumer1").stop();
endpointRegistry.getListenerContainer("consumer1").start();
我们正在 kubernetes pod 上部署微服务,因此同一个微服务可能有多个部署。我如何设法启动和停止所有容器上的消费者。
Kubernetes 没有提供自动向所有 pods 广播 http 请求的服务;所以你必须自己做。
通过 Kafka 广播
您可以从接收到主题的 http 请求的单个实例发布 start/stop 命令,专门用于在所有实例之间广播命令。
当然,您必须确保每个实例都可以读取该主题的所有消息,因此您需要防止分区在这些实例之间被平衡。您可以通过在该主题的消费者上设置唯一的组 ID(例如,通过在您的普通组 ID 后缀 UUID)来实现这一点。
通过 Http 广播
Kubernetes 知道哪些 pods 正在监听哪些端点,您可以在您的服务中获取该信息。 Spring Cloud Kubernetes (https://cloud.spring.io/spring-cloud-static/spring-cloud-kubernetes/2.0.0.M1/reference/html/#ribbon-discovery-in-kubernetes) 使获取该信息变得容易;可能有很多不同的方法可以做到这一点,使用 Spring Cloud Kubernetes 它会像这样:
在随机选择的 pod 上接收命令,从 Ribbon 获取服务器列表(它包含所有实例和可以到达它们的 ip-address/port ),并发送一个新的 http-request 到他们每个人。
我更喜欢 Kafka 方法,因为它的健壮性,如果您已经在使用 Spring 云,http 方法可能更容易实施。