Spring Cloud Streams 的 StreamRetryTemplate 未在集成测试中重试
StreamRetryTemplate for Spring Cloud Streams not retrying in integration tests
我们正在利用 Spring Cloud Streams 来侦听 Kafka 主题并调用休息服务。我们还实现了一个自定义 StreamRetryTemplate 来指定我们认为可以恢复的错误类型,以及我们认为不可恢复的错误类型。我无法在它在 运行 时间的工作方式和它在集成测试中的工作方式之间获得一致的结果。
我已经在调试模式下验证异常被正确抛出并且 RetryTemplate 被正确注入,但它似乎没有在我的集成测试中使用。
@EnableBinding(Sink::class)
class MyListener(private val myService: Service) {
@StreamListener(Sink.Input)
fun consume(@Payload msg: MyMessage) = myService.process(msg)
@SteamRetryTemplate
fun getRetryTemplate() = RetryTemplate()
}
当我 运行 这个应用程序和 myService 抛出异常时,我希望它能被重试,而且它做得很好。但是,当我使用 wiremock 服务器编写集成测试并让 myService 抛出异常时,它不会重试。我有断言语句来验证我的 wiremock 端点被击中了多少次。
我是否遗漏了一些专门用于重试以在集成测试中工作的东西?
你用的是test binder还是嵌入式kafka broker?测试活页夹相当有限;使用嵌入式代理是完整集成测试的首选。
见Testing Applications in the Spring for Apache Kafka Documentation。
编辑
@SpringBootApplication
@EnableBinding(Sink.class)
public class So55855151Application {
public static void main(String[] args) {
SpringApplication.run(So55855151Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
throw new RuntimeException("fail");
}
@StreamRetryTemplate
public RetryTemplate retrier() {
return new RetryTemplate();
}
}
spring.cloud.stream.bindings.input.group=input
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka
public class So55855151ApplicationTests {
@Autowired
private KafkaTemplate<byte[], byte[]> template;
@Autowired
private RetryTemplate retrier;
@Test
public void test() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
this.retrier.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
System.out.println("open");
latch.countDown();
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
System.out.println("close");
latch.countDown();
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
System.out.println("onError: " + throwable);
latch.countDown();
}
});
this.template.send("input", "test".getBytes());
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
}
我们正在利用 Spring Cloud Streams 来侦听 Kafka 主题并调用休息服务。我们还实现了一个自定义 StreamRetryTemplate 来指定我们认为可以恢复的错误类型,以及我们认为不可恢复的错误类型。我无法在它在 运行 时间的工作方式和它在集成测试中的工作方式之间获得一致的结果。
我已经在调试模式下验证异常被正确抛出并且 RetryTemplate 被正确注入,但它似乎没有在我的集成测试中使用。
@EnableBinding(Sink::class)
class MyListener(private val myService: Service) {
@StreamListener(Sink.Input)
fun consume(@Payload msg: MyMessage) = myService.process(msg)
@SteamRetryTemplate
fun getRetryTemplate() = RetryTemplate()
}
当我 运行 这个应用程序和 myService 抛出异常时,我希望它能被重试,而且它做得很好。但是,当我使用 wiremock 服务器编写集成测试并让 myService 抛出异常时,它不会重试。我有断言语句来验证我的 wiremock 端点被击中了多少次。
我是否遗漏了一些专门用于重试以在集成测试中工作的东西?
你用的是test binder还是嵌入式kafka broker?测试活页夹相当有限;使用嵌入式代理是完整集成测试的首选。
见Testing Applications in the Spring for Apache Kafka Documentation。
编辑
@SpringBootApplication
@EnableBinding(Sink.class)
public class So55855151Application {
public static void main(String[] args) {
SpringApplication.run(So55855151Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
throw new RuntimeException("fail");
}
@StreamRetryTemplate
public RetryTemplate retrier() {
return new RetryTemplate();
}
}
spring.cloud.stream.bindings.input.group=input
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka
public class So55855151ApplicationTests {
@Autowired
private KafkaTemplate<byte[], byte[]> template;
@Autowired
private RetryTemplate retrier;
@Test
public void test() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
this.retrier.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
System.out.println("open");
latch.countDown();
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
System.out.println("close");
latch.countDown();
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
System.out.println("onError: " + throwable);
latch.countDown();
}
});
this.template.send("input", "test".getBytes());
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
}