如何在 Spring 单元测试中重启关闭的 embeddedKafkaServer?
How do I Restart a shutdown embeddedKafkaServer in a Spring Unit Test?
我有一个 Spring-boot 单元测试,它在主 Kafka 集群联机时测试我的应用程序的切换回功能。
当主应用程序脱机时,应用程序成功切换到辅助应用程序。现在我们添加了在计时器而不是失败时切换回主服务器的功能。
我的测试方法是这样的:
//Rochelle = Primary BootStrapServers
//Hudson = Secondary BootStrapServers
@Test
public void send_switchback() throws Exception
{
//Get ABSwitchCluster to check failover details
KafkaSwitchCluster ktSwitch = (KafkaSwitchCluster)
((BootStrapExposerProducerFactory)
kafkaTemplate.getProducerFactory()).getBootStrapSupplier();
assertThat(ktSwitch, notNullValue());
assertThat(ktSwitch.get(), is(Rochelle));
assertThat(ktSwitch.isPrimary(), is(true));
assertThat(getBootStrapServersList(), is(Rochelle));
log.info("Shutdown Broker to test Failover.");
//Shutdown Primary Servers to simulate disconnection
shutdownBroker_primary();
//Allow for fail over to happen
if ( ktSwitch.isPrimary() )
{
try
{
synchronized (lock)
{ //pause to give Idle Event a chance to fire
for (int i = 0; i <= timeOut && ktSwitch.isPrimary(); ++i)
//while ( ktSwitch.isPrimary() )
{ //poll for cluster switch
lock.wait(Duration.ofSeconds(15).toMillis());
}
}
}
catch (InterruptedException IGNORE)
{ fail("Unable to wait for cluster switch. " + IGNORE.getMessage()); }
}
//Confirm Failover has happened
assertThat(ktSwitch.get(), is(Hudson));
assertThat(ktSwitch.isPrimary(), is(false));
assertThat(getBootStrapServersList(), is(Hudson));
assertThat(kafkaSwitchCluster.get(), is(Hudson));
assertThat(kafkaSwitchCluster.isPrimary(), is(false));
//Send a message on backup server
String message = "Test Failover";
send(message);
String msg = records.poll(10, TimeUnit.SECONDS);
assertThat(msg, notNullValue());
assertThat(msg, is(message));
startup_primary();
//embeddedKafkaRule.getEmbeddedKafka();
assertThat(embeddedKafka.getBrokersAsString(), is(Rochelle));
String brokers = embeddedKafka.getBrokersAsString();
if ( !kafkaProducerErrorHandler.areBrokersUp(brokers) )
{
synchronized (lock)
{
for ( int i=0;
i <= 15 && !kafkaProducerErrorHandler.areBrokersUp(brokers)
&& registry.isRunning();
++i )
{ lock.wait(Duration.ofSeconds(1).toMillis()); }
}
}
//TODO: test Scheduled Fire
kafkaProducerErrorHandler.primarySwitch();
if ( !kafkaSwitchCluster.isPrimary() )
{
try
{
synchronized (lock)
{ //pause to give Idle Event a chance to fire
for (int i = 0; i <= timeOut && !kafkaSwitchCluster.isPrimary(); ++i)
//while ( !ktSwitch.isPrimary() )
{ //poll for cluster switch
lock.wait(Duration.ofSeconds(15).toMillis());
}
}
}
catch (InterruptedException IGNORE)
{ fail("Unable to wait for cluster switch. " + IGNORE.getMessage()); }
}
assertThat(brokers, anyOf(is(Rochelle), is(Hudson))); //port didn't change
assertThat(brokers, is(Rochelle)); //is primary
assertThat(kafkaSwitchCluster.isPrimary(), is(true));
//assertThat(ktSwitch.isPrimary(), is(true));
assertThat(ktSwitch.get(), is(brokers));
assertThat(kafkaProducerErrorHandler.areBrokersUp(brokers), is(true));
assertThat(kafkaProducerErrorHandler.areBrokersUp(Rochelle), is(true));
assertThat(ktSwitch.isPrimary(), is(true));
//assertThat(ktSwitch.get(), not(anyOf(is(Hudson), is(Rochelle))));
assertThat(ktSwitch.get(), is(embeddedKafka.getBrokersAsString()));
//Send a message on backup server
message = "Test newPrimary";
send(message);
msg = records.poll(10, TimeUnit.SECONDS);
assertThat(msg, notNullValue());
assertThat(msg, is(message));
log.info("Test is finished");
}
我正在使用这种方法关闭我的主嵌入式 Kafka
public void shutdownBroker_primary()
{
for(KafkaServer ks : embeddedKafka.getKafkaServers())
{ ks.shutdown(); }
for(KafkaServer ks : embeddedKafka.getKafkaServers())
{ ks.awaitShutdown(); }
}
我正在使用它来重启 Kafka:
public void startup_primary()
{
//registry.stop();
//kafkaSwitchCluster.Rochelle = embeddedKafka.getBrokersAsString();
for(KafkaServer ks : embeddedKafka.getKafkaServers()) { ks.startup(); }
registry.start();
}
primarySwitch()
是将集群切换回主集群的计划事件。它在测试中直接调用。它是围绕相同代码的包装器,可在 Kafka 出现故障时切换正在使用的集群。
如何让主嵌入式 Kafka 集群在关闭后成功启动,以便我可以证明应用程序可以在主集群再次可用后成功移回主集群?
更新:
我已经在 Github 上用我目前所拥有的创建了代码示例:https://github.com/raystorm/Kafka-Example .
更新:2 Linked Repository Above 已经根据下面接受的答案进行了更新,现在所有测试都通过了。
它并不是真正为这个用例设计的,但只要您不需要在代理实例之间保留数据,以下内容就可以工作...
@SpringBootTest
@EmbeddedKafka(topics = "so64145670", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So64145670ApplicationTests {
@Autowired
private EmbeddedKafkaBroker broker;
@Test
void restartBroker(@Autowired KafkaTemplate<String, String> template) throws Exception {
SendResult<String, String> sendResult = template.send("so64145670", "foo").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
this.broker.destroy();
// restart
this.broker.afterPropertiesSet();
sendResult = template.send("so64145670", "bar").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
}
}
编辑
这是一个有两个经纪人的...
@SpringBootTest(classes = { So64145670Application.class, So64145670ApplicationTests.Config.class })
@EmbeddedKafka(topics = "so64145670", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So64145670ApplicationTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private EmbeddedKafkaBroker secondBroker;
@Test
void restartBroker(@Autowired KafkaTemplate<String, String> template,
@Autowired ProducerFactory<String, String> pf) throws Exception {
SendResult<String, String> sendResult = template.send("so64145670", "foo").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
KafkaTemplate<String, String> secondTemplate = new KafkaTemplate<>(pf,
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.secondBroker.getBrokersAsString()));
sendResult = secondTemplate.send("so64145670-1", "foo").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
this.embeddedKafka.destroy();
this.secondBroker.destroy();
// restart
this.embeddedKafka.afterPropertiesSet();
this.secondBroker.afterPropertiesSet();
sendResult = template.send("so64145670", "bar").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
sendResult = secondTemplate.send("so64145670-1", "bar").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
}
@Configuration
public static class Config {
@Bean
EmbeddedKafkaBroker secondBroker() {
return new EmbeddedKafkaBroker(1, true, "so64145670-1")
.brokerListProperty("spring.kafka.second.server");
}
}
}
+++so64145670-1@0
+++so64145670-1-0@0
+++so64145670-1@0
+++so64145670-1-0@0
我有一个 Spring-boot 单元测试,它在主 Kafka 集群联机时测试我的应用程序的切换回功能。
当主应用程序脱机时,应用程序成功切换到辅助应用程序。现在我们添加了在计时器而不是失败时切换回主服务器的功能。
我的测试方法是这样的:
//Rochelle = Primary BootStrapServers
//Hudson = Secondary BootStrapServers
@Test
public void send_switchback() throws Exception
{
//Get ABSwitchCluster to check failover details
KafkaSwitchCluster ktSwitch = (KafkaSwitchCluster)
((BootStrapExposerProducerFactory)
kafkaTemplate.getProducerFactory()).getBootStrapSupplier();
assertThat(ktSwitch, notNullValue());
assertThat(ktSwitch.get(), is(Rochelle));
assertThat(ktSwitch.isPrimary(), is(true));
assertThat(getBootStrapServersList(), is(Rochelle));
log.info("Shutdown Broker to test Failover.");
//Shutdown Primary Servers to simulate disconnection
shutdownBroker_primary();
//Allow for fail over to happen
if ( ktSwitch.isPrimary() )
{
try
{
synchronized (lock)
{ //pause to give Idle Event a chance to fire
for (int i = 0; i <= timeOut && ktSwitch.isPrimary(); ++i)
//while ( ktSwitch.isPrimary() )
{ //poll for cluster switch
lock.wait(Duration.ofSeconds(15).toMillis());
}
}
}
catch (InterruptedException IGNORE)
{ fail("Unable to wait for cluster switch. " + IGNORE.getMessage()); }
}
//Confirm Failover has happened
assertThat(ktSwitch.get(), is(Hudson));
assertThat(ktSwitch.isPrimary(), is(false));
assertThat(getBootStrapServersList(), is(Hudson));
assertThat(kafkaSwitchCluster.get(), is(Hudson));
assertThat(kafkaSwitchCluster.isPrimary(), is(false));
//Send a message on backup server
String message = "Test Failover";
send(message);
String msg = records.poll(10, TimeUnit.SECONDS);
assertThat(msg, notNullValue());
assertThat(msg, is(message));
startup_primary();
//embeddedKafkaRule.getEmbeddedKafka();
assertThat(embeddedKafka.getBrokersAsString(), is(Rochelle));
String brokers = embeddedKafka.getBrokersAsString();
if ( !kafkaProducerErrorHandler.areBrokersUp(brokers) )
{
synchronized (lock)
{
for ( int i=0;
i <= 15 && !kafkaProducerErrorHandler.areBrokersUp(brokers)
&& registry.isRunning();
++i )
{ lock.wait(Duration.ofSeconds(1).toMillis()); }
}
}
//TODO: test Scheduled Fire
kafkaProducerErrorHandler.primarySwitch();
if ( !kafkaSwitchCluster.isPrimary() )
{
try
{
synchronized (lock)
{ //pause to give Idle Event a chance to fire
for (int i = 0; i <= timeOut && !kafkaSwitchCluster.isPrimary(); ++i)
//while ( !ktSwitch.isPrimary() )
{ //poll for cluster switch
lock.wait(Duration.ofSeconds(15).toMillis());
}
}
}
catch (InterruptedException IGNORE)
{ fail("Unable to wait for cluster switch. " + IGNORE.getMessage()); }
}
assertThat(brokers, anyOf(is(Rochelle), is(Hudson))); //port didn't change
assertThat(brokers, is(Rochelle)); //is primary
assertThat(kafkaSwitchCluster.isPrimary(), is(true));
//assertThat(ktSwitch.isPrimary(), is(true));
assertThat(ktSwitch.get(), is(brokers));
assertThat(kafkaProducerErrorHandler.areBrokersUp(brokers), is(true));
assertThat(kafkaProducerErrorHandler.areBrokersUp(Rochelle), is(true));
assertThat(ktSwitch.isPrimary(), is(true));
//assertThat(ktSwitch.get(), not(anyOf(is(Hudson), is(Rochelle))));
assertThat(ktSwitch.get(), is(embeddedKafka.getBrokersAsString()));
//Send a message on backup server
message = "Test newPrimary";
send(message);
msg = records.poll(10, TimeUnit.SECONDS);
assertThat(msg, notNullValue());
assertThat(msg, is(message));
log.info("Test is finished");
}
我正在使用这种方法关闭我的主嵌入式 Kafka
public void shutdownBroker_primary()
{
for(KafkaServer ks : embeddedKafka.getKafkaServers())
{ ks.shutdown(); }
for(KafkaServer ks : embeddedKafka.getKafkaServers())
{ ks.awaitShutdown(); }
}
我正在使用它来重启 Kafka:
public void startup_primary()
{
//registry.stop();
//kafkaSwitchCluster.Rochelle = embeddedKafka.getBrokersAsString();
for(KafkaServer ks : embeddedKafka.getKafkaServers()) { ks.startup(); }
registry.start();
}
primarySwitch()
是将集群切换回主集群的计划事件。它在测试中直接调用。它是围绕相同代码的包装器,可在 Kafka 出现故障时切换正在使用的集群。
如何让主嵌入式 Kafka 集群在关闭后成功启动,以便我可以证明应用程序可以在主集群再次可用后成功移回主集群?
更新:
我已经在 Github 上用我目前所拥有的创建了代码示例:https://github.com/raystorm/Kafka-Example .
更新:2 Linked Repository Above 已经根据下面接受的答案进行了更新,现在所有测试都通过了。
它并不是真正为这个用例设计的,但只要您不需要在代理实例之间保留数据,以下内容就可以工作...
@SpringBootTest
@EmbeddedKafka(topics = "so64145670", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So64145670ApplicationTests {
@Autowired
private EmbeddedKafkaBroker broker;
@Test
void restartBroker(@Autowired KafkaTemplate<String, String> template) throws Exception {
SendResult<String, String> sendResult = template.send("so64145670", "foo").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
this.broker.destroy();
// restart
this.broker.afterPropertiesSet();
sendResult = template.send("so64145670", "bar").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
}
}
编辑
这是一个有两个经纪人的...
@SpringBootTest(classes = { So64145670Application.class, So64145670ApplicationTests.Config.class })
@EmbeddedKafka(topics = "so64145670", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So64145670ApplicationTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private EmbeddedKafkaBroker secondBroker;
@Test
void restartBroker(@Autowired KafkaTemplate<String, String> template,
@Autowired ProducerFactory<String, String> pf) throws Exception {
SendResult<String, String> sendResult = template.send("so64145670", "foo").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
KafkaTemplate<String, String> secondTemplate = new KafkaTemplate<>(pf,
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.secondBroker.getBrokersAsString()));
sendResult = secondTemplate.send("so64145670-1", "foo").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
this.embeddedKafka.destroy();
this.secondBroker.destroy();
// restart
this.embeddedKafka.afterPropertiesSet();
this.secondBroker.afterPropertiesSet();
sendResult = template.send("so64145670", "bar").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
sendResult = secondTemplate.send("so64145670-1", "bar").get(10, TimeUnit.SECONDS);
System.out.println("+++" + sendResult.getRecordMetadata());
}
@Configuration
public static class Config {
@Bean
EmbeddedKafkaBroker secondBroker() {
return new EmbeddedKafkaBroker(1, true, "so64145670-1")
.brokerListProperty("spring.kafka.second.server");
}
}
}
+++so64145670-1@0
+++so64145670-1-0@0
+++so64145670-1@0
+++so64145670-1-0@0