如何使用 ABSSwitchCluster 在 Kafka 集群之间进行故障转移
How to use ABSwitchCluster to failover between Kafka clusters
我有一个使用 Kafka 在实例之间同步数据的应用程序,因此它从 Kafka 生成和使用数据,此外,该应用程序正在使用 Kafka 主题并将该数据转换和流式传输到另一个主题以供客户端使用.
我的应用程序有两个用于故障转移的集群。通过 Kafka 文档我发现了这个
https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting 谈论 ABSwitchCluster
.
对于 KafkaTemplate.send()
和 @KafkaListener
注释方法,如果 Kafka 集群出现故障,我如何使用 ABSwitchCluster 自动进行故障转移?
更新更多信息
我为 KafkaTemplate.send 和 Kafka 消费者事件 NonResponsiveConsumerEvent
和 ListenerContainerIdleEvent
添加了一些错误处理程序
最终他们调用了一个共享方法来切换,并且使用了一个BeanPostProcessor
来实际将ABSwitchCluster
添加到KafkaResourceFactory
Beans。
切换代码如下所示:
@Autowired
KafkaSwitchCluster kafkaSwitchCluster;
@Autowired
WebApplicationContext context;
@Autowired
KafkaListenerEndpointRegistry registry;
/**
* Unable to use {@link Autowired} due to circular dependency
* with {@link KafkaPostProcessor}
* @return
*/
public DefaultKafkaProducerFactory getDefaultKafkaProducerFactory()
{ return context.getBean(DefaultKafkaProducerFactory.class); }
/** Back-End Method to Actually Switch between the clusters */
private void switchCluster()
{
if (kafkaSwitchCluster.isPrimary()) { kafkaSwitchCluster.secondary(); }
else { kafkaSwitchCluster.primary(); }
getDefaultKafkaProducerFactory().reset();
registry.stop();
registry.destroy();
registry.start();
for(MessageListenerContainer listener : registry.getListenerContainers() )
{
listener.stop();
listener.start();
}
}
查看测试日志时,鉴于上述更新,生产者似乎正确地切换了集群,但我的消费者却没有。
那么我怎样才能让 @KafkaListener
消费者转换?
默认生产者和消费者工厂以及 KafkaAdmin
是 KafkaResourceFactory
的子类。
您通过调用 setBootstrapServersSupplier()
传递了 ABSwitchCluster
。
ABSwitchCluster
不会自动故障转移。
您需要自己的代码来执行故障转移,然后调用生产者工厂 reset()
和 stop/start 所有侦听器容器(KafkaListenerEndpointRegistry.stop()/start()
所有 @KafkaListener
s .
我有一个使用 Kafka 在实例之间同步数据的应用程序,因此它从 Kafka 生成和使用数据,此外,该应用程序正在使用 Kafka 主题并将该数据转换和流式传输到另一个主题以供客户端使用.
我的应用程序有两个用于故障转移的集群。通过 Kafka 文档我发现了这个
https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting 谈论 ABSwitchCluster
.
对于 KafkaTemplate.send()
和 @KafkaListener
注释方法,如果 Kafka 集群出现故障,我如何使用 ABSwitchCluster 自动进行故障转移?
更新更多信息
我为 KafkaTemplate.send 和 Kafka 消费者事件 NonResponsiveConsumerEvent
和 ListenerContainerIdleEvent
最终他们调用了一个共享方法来切换,并且使用了一个BeanPostProcessor
来实际将ABSwitchCluster
添加到KafkaResourceFactory
Beans。
切换代码如下所示:
@Autowired
KafkaSwitchCluster kafkaSwitchCluster;
@Autowired
WebApplicationContext context;
@Autowired
KafkaListenerEndpointRegistry registry;
/**
* Unable to use {@link Autowired} due to circular dependency
* with {@link KafkaPostProcessor}
* @return
*/
public DefaultKafkaProducerFactory getDefaultKafkaProducerFactory()
{ return context.getBean(DefaultKafkaProducerFactory.class); }
/** Back-End Method to Actually Switch between the clusters */
private void switchCluster()
{
if (kafkaSwitchCluster.isPrimary()) { kafkaSwitchCluster.secondary(); }
else { kafkaSwitchCluster.primary(); }
getDefaultKafkaProducerFactory().reset();
registry.stop();
registry.destroy();
registry.start();
for(MessageListenerContainer listener : registry.getListenerContainers() )
{
listener.stop();
listener.start();
}
}
查看测试日志时,鉴于上述更新,生产者似乎正确地切换了集群,但我的消费者却没有。
那么我怎样才能让 @KafkaListener
消费者转换?
默认生产者和消费者工厂以及 KafkaAdmin
是 KafkaResourceFactory
的子类。
您通过调用 setBootstrapServersSupplier()
传递了 ABSwitchCluster
。
ABSwitchCluster
不会自动故障转移。
您需要自己的代码来执行故障转移,然后调用生产者工厂 reset()
和 stop/start 所有侦听器容器(KafkaListenerEndpointRegistry.stop()/start()
所有 @KafkaListener
s .