如何在 Spring 单元测试中重启关闭的 embeddedKafkaServer?

How do I Restart a shutdown embeddedKafkaServer in a Spring Unit Test?

我有一个 Spring-boot 单元测试,它在主 Kafka 集群联机时测试我的应用程序的切换回功能。

当主应用程序脱机时,应用程序成功切换到辅助应用程序。现在我们添加了在计时器而不是失败时切换回主服务器的功能。

我的测试方法是这样的:

   //Rochelle = Primary BootStrapServers
   //Hudson   = Secondary BootStrapServers


   @Test
   public void send_switchback() throws Exception
   {
      //Get ABSwitchCluster to check failover details
      KafkaSwitchCluster ktSwitch = (KafkaSwitchCluster)
              ((BootStrapExposerProducerFactory)
                       kafkaTemplate.getProducerFactory()).getBootStrapSupplier();

      assertThat(ktSwitch,             notNullValue());
      assertThat(ktSwitch.get(),       is(Rochelle));
      assertThat(ktSwitch.isPrimary(), is(true));

      assertThat(getBootStrapServersList(), is(Rochelle));

      log.info("Shutdown Broker to test Failover.");

      //Shutdown Primary Servers to simulate disconnection
      shutdownBroker_primary();
      //Allow for fail over to happen
      if ( ktSwitch.isPrimary() )
      {
         try
         {
            synchronized (lock)
            {  //pause to give Idle Event a chance to fire
               for (int i = 0; i <= timeOut && ktSwitch.isPrimary(); ++i)
               //while ( ktSwitch.isPrimary() )
               {  //poll for cluster switch
                  lock.wait(Duration.ofSeconds(15).toMillis());
               }
            }
         }
         catch (InterruptedException IGNORE)
         { fail("Unable to wait for cluster switch. " + IGNORE.getMessage()); }
      }

      //Confirm Failover has happened
      assertThat(ktSwitch.get(),            is(Hudson));
      assertThat(ktSwitch.isPrimary(),      is(false));
      assertThat(getBootStrapServersList(), is(Hudson));

      assertThat(kafkaSwitchCluster.get(),       is(Hudson));
      assertThat(kafkaSwitchCluster.isPrimary(), is(false));

      //Send a message on backup server
      String message = "Test Failover";
      send(message);

      String msg = records.poll(10, TimeUnit.SECONDS);
      assertThat(msg, notNullValue());
      assertThat(msg, is(message));

      startup_primary();
      //embeddedKafkaRule.getEmbeddedKafka();

      assertThat(embeddedKafka.getBrokersAsString(), is(Rochelle));
      String brokers = embeddedKafka.getBrokersAsString();

      if ( !kafkaProducerErrorHandler.areBrokersUp(brokers) )
      {
         synchronized (lock)
         {
            for ( int i=0;
                  i <= 15 && !kafkaProducerErrorHandler.areBrokersUp(brokers)
                  && registry.isRunning();
                  ++i )
            { lock.wait(Duration.ofSeconds(1).toMillis()); }
         }
      }

      //TODO: test Scheduled Fire
      kafkaProducerErrorHandler.primarySwitch();

      if ( !kafkaSwitchCluster.isPrimary() )
      {
         try
         {
            synchronized (lock)
            {  //pause to give Idle Event a chance to fire
               for (int i = 0; i <= timeOut && !kafkaSwitchCluster.isPrimary(); ++i)
               //while ( !ktSwitch.isPrimary() )
               {  //poll for cluster switch
                  lock.wait(Duration.ofSeconds(15).toMillis());
               }
            }
         }
         catch (InterruptedException IGNORE)
         { fail("Unable to wait for cluster switch. " + IGNORE.getMessage()); }
      }

      assertThat(brokers,              anyOf(is(Rochelle), is(Hudson))); //port didn't change
      assertThat(brokers,              is(Rochelle)); //is primary
      assertThat(kafkaSwitchCluster.isPrimary(), is(true));
      //assertThat(ktSwitch.isPrimary(), is(true));
      assertThat(ktSwitch.get(),       is(brokers));

      assertThat(kafkaProducerErrorHandler.areBrokersUp(brokers),  is(true));
      assertThat(kafkaProducerErrorHandler.areBrokersUp(Rochelle), is(true));

      assertThat(ktSwitch.isPrimary(), is(true));
      //assertThat(ktSwitch.get(),       not(anyOf(is(Hudson), is(Rochelle))));
      assertThat(ktSwitch.get(),       is(embeddedKafka.getBrokersAsString()));

      //Send a message on backup server
      message = "Test newPrimary";
      send(message);

      msg = records.poll(10, TimeUnit.SECONDS);
      assertThat(msg, notNullValue());
      assertThat(msg, is(message));

      log.info("Test is finished");
   }

我正在使用这种方法关闭我的主嵌入式 Kafka

   public void shutdownBroker_primary()
   {
      for(KafkaServer ks : embeddedKafka.getKafkaServers())
      { ks.shutdown(); }
      for(KafkaServer ks : embeddedKafka.getKafkaServers())
      { ks.awaitShutdown(); }
   }

我正在使用它来重启 Kafka:

public void startup_primary()
   {
      //registry.stop();
      //kafkaSwitchCluster.Rochelle = embeddedKafka.getBrokersAsString();
      for(KafkaServer ks : embeddedKafka.getKafkaServers()) { ks.startup(); }
      registry.start();
   }

primarySwitch() 是将集群切换回主集群的计划事件。它在测试中直接调用。它是围绕相同代码的包装器,可在 Kafka 出现故障时切换正在使用的集群。

如何让主嵌入式 Kafka 集群在关闭后成功启动,以便我可以证明应用程序可以在主集群再次可用后成功移回主集群?


更新:
我已经在 Github 上用我目前所拥有的创建了代码示例:https://github.com/raystorm/Kafka-Example .


更新:2 Linked Repository Above 已经根据下面接受的答案进行了更新,现在所有测试都通过了。

它并不是真正为这个用例设计的,但只要您不需要在代理实例之间保留数据,以下内容就可以工作...

@SpringBootTest
@EmbeddedKafka(topics = "so64145670", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So64145670ApplicationTests {

    @Autowired
    private EmbeddedKafkaBroker broker;

    @Test
    void restartBroker(@Autowired KafkaTemplate<String, String> template) throws Exception {
        SendResult<String, String> sendResult = template.send("so64145670", "foo").get(10, TimeUnit.SECONDS);
        System.out.println("+++" + sendResult.getRecordMetadata());
        this.broker.destroy();
        // restart
        this.broker.afterPropertiesSet();
        sendResult = template.send("so64145670", "bar").get(10, TimeUnit.SECONDS);
        System.out.println("+++" + sendResult.getRecordMetadata());
    }

}

编辑

这是一个有两个经纪人的...

@SpringBootTest(classes = { So64145670Application.class, So64145670ApplicationTests.Config.class })
@EmbeddedKafka(topics = "so64145670", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So64145670ApplicationTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Autowired
    private EmbeddedKafkaBroker secondBroker;

    @Test
    void restartBroker(@Autowired KafkaTemplate<String, String> template,
            @Autowired ProducerFactory<String, String> pf) throws Exception {

        SendResult<String, String> sendResult = template.send("so64145670", "foo").get(10, TimeUnit.SECONDS);
        System.out.println("+++" + sendResult.getRecordMetadata());
        KafkaTemplate<String, String> secondTemplate = new KafkaTemplate<>(pf,
                Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.secondBroker.getBrokersAsString()));
        sendResult = secondTemplate.send("so64145670-1", "foo").get(10, TimeUnit.SECONDS);
        System.out.println("+++" + sendResult.getRecordMetadata());
        this.embeddedKafka.destroy();
        this.secondBroker.destroy();
        // restart
        this.embeddedKafka.afterPropertiesSet();
        this.secondBroker.afterPropertiesSet();
        sendResult = template.send("so64145670", "bar").get(10, TimeUnit.SECONDS);
        System.out.println("+++" + sendResult.getRecordMetadata());
        sendResult = secondTemplate.send("so64145670-1", "bar").get(10, TimeUnit.SECONDS);
        System.out.println("+++" + sendResult.getRecordMetadata());
    }

    @Configuration
    public static class Config {

        @Bean
        EmbeddedKafkaBroker secondBroker() {
            return new EmbeddedKafkaBroker(1, true, "so64145670-1")
                    .brokerListProperty("spring.kafka.second.server");
        }

    }

}
+++so64145670-1@0
+++so64145670-1-0@0
+++so64145670-1@0
+++so64145670-1-0@0