如何使用 ABSSwitchCluster 在 Kafka 集群之间进行故障转移

How to use ABSwitchCluster to failover between Kafka clusters

我有一个使用 Kafka 在实例之间同步数据的应用程序,因此它从 Kafka 生成和使用数据,此外,该应用程序正在使用 Kafka 主题并将该数据转换和流式传输到另一个主题以供客户端使用.

我的应用程序有两个用于故障转移的集群。通过 Kafka 文档我发现了这个 https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting 谈论 ABSwitchCluster.

对于 KafkaTemplate.send()@KafkaListener 注释方法,如果 Kafka 集群出现故障,我如何使用 ABSwitchCluster 自动进行故障转移?


更新更多信息

我为 KafkaTemplate.send 和 Kafka 消费者事件 NonResponsiveConsumerEventListenerContainerIdleEvent

添加了一些错误处理程序

最终他们调用了一个共享方法来切换,并且使用了一个BeanPostProcessor来实际将ABSwitchCluster添加到KafkaResourceFactory Beans。

切换代码如下所示:

   @Autowired
   KafkaSwitchCluster kafkaSwitchCluster;

   @Autowired
   WebApplicationContext context;

   @Autowired
   KafkaListenerEndpointRegistry registry;

   /**
    *  Unable to use {@link Autowired} due to circular dependency
    *  with {@link KafkaPostProcessor}
    *  @return
    */
   public DefaultKafkaProducerFactory getDefaultKafkaProducerFactory()
   { return context.getBean(DefaultKafkaProducerFactory.class); }

   /** Back-End Method to Actually Switch between the clusters */
   private void switchCluster()
   {
      if (kafkaSwitchCluster.isPrimary()) { kafkaSwitchCluster.secondary(); }
      else { kafkaSwitchCluster.primary(); }

      getDefaultKafkaProducerFactory().reset();

      registry.stop();
      registry.destroy();
      registry.start();

      for(MessageListenerContainer listener : registry.getListenerContainers() )
      {
         listener.stop();
         listener.start();
      }
   }

查看测试日志时,鉴于上述更新,生产者似乎正确地切换了集群,但我的消费者却没有。

那么我怎样才能让 @KafkaListener 消费者转换?

默认生产者和消费者工厂以及 KafkaAdminKafkaResourceFactory 的子类。

您通过调用 setBootstrapServersSupplier() 传递了 ABSwitchCluster

ABSwitchCluster 不会自动故障转移。

您需要自己的代码来执行故障转移,然后调用生产者工厂 reset() 和 stop/start 所有侦听器容器(KafkaListenerEndpointRegistry.stop()/start() 所有 @KafkaListeners .