Spring Cloud Stream Kafka Streams Binder 3.x:在多个输出绑定的情况下没有输出到第二个输出主题
Spring Cloud Stream Kafka Streams Binder 3.x: No output to the second output topic in case of multiple output bindings
我有以下处理器 bean 方法签名:
@Bean
public BiFunction<KStream<String, MyClass>, KStream<String, String>, KStream<String, MyClass>[]> myStream() {
return (inputStream1, intputStream2) -> {
intputStream2
.peek((k, v) -> {
log.debug(...);
});
return inputStream1
.mapValues(...)
.branch((k,v) -> true, (k,v) -> true);
};
}
相关属性:
spring.cloud.stream.function.definition: ...;myStream
spring.cloud.stream.bindings:
myStream-in-0:
destination: inp0
myStream-in-1:
destination: inp1
myStream-out-0:
destination: out0
myStream-out-1:
destination: out1
Spring Cloud Kafka Stream 版本 Hoxton.SR4 (spring-cloud-stream-binder-kafka-streams:jar:3.0.4.RELEASE),内嵌 Kafka 版本 2.5。 0.
我正在使用嵌入式 Kafka 测试我的拓扑:
@RunWith(SpringRunner.class)
@SpringBootTest(
properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
topics = {
"inp0", "inp1", "out0", "out1"
},
brokerPropertiesLocation = "kafka.properties"
)
@Slf4j
public class MyApplicationTests {
@Test
public void embeddedKafkaTest() throws IOException, InterruptedException {
Consumer<String, MyClass> out0Consumer = createConsumer("out0ConsumerGroup");
Consumer<String, MyClass> out1Consumer = createConsumer("out1ConsumerGroup");
this.embeddedKafka.consumeFromAnEmbeddedTopic(out0Consumer, "out0");
this.embeddedKafka.consumeFromAnEmbeddedTopic(out1Consumer, "out1");
latch = new CountDownLatch(1);
// ... publish ...
latch.await(15, TimeUnit.SECONDS);
ConsumerRecords<String, MyClass> out0 = KafkaTestUtils.getRecords(out0Consumer);
assertThat(out0.count(), is(greaterThanOrEqualTo(1)));
ConsumerRecords<String, MyClass> out1 = KafkaTestUtils.getRecords(out1Consumer);
assertThat(out1.count(), is(greaterThanOrEqualTo(1)));
}
private <K,V> Consumer<K, V> createConsumer(String groupName) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(groupName, "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<K, V>(consumerProps).createConsumer();
}
我的测试表明,来自 myStream
的消息按预期到达并到达主题“out0”,但“out1”主题仍然为空,并且单元测试在第二个断言中失败。
我已经尝试了几件事,但看起来好像根本就没有生成到第二个输出主题的输出(到第一个输出主题的输出生成良好)。
你能看出我的设置有什么错误吗?
还有一件事:myStream bean 方法定义中的 return 语句显示编译器警告:
Unchecked generics array creation for varargs parameter
但看起来 Spring Cloud Kafka Stream 3.x API 需要定义 return 类型?
您将两个谓词传递给 branch
方法,并且它们的计算结果始终为 true
。第一个谓词总是获胜并为第一个输出绑定生成数据。分支方法调用在第一个谓词评估为真后退出。有关详细信息,请参阅 javadoc。您应该使用不同的谓词(可能检查 key/value 上的某些条件)。如果第一个谓词失败而第二个谓词成功,那么您将看到为第二个输出主题生成的数据。
关于那个编译器警告,我认为您可以安全地忽略它,因为 API 将确保传递给 branch
调用的谓词对象具有正确的类型。由于该方法的实现使用通用可变参数,因此您会遇到该异常。有关该编译器警告的详细信息,请参阅 this 线程。
我有以下处理器 bean 方法签名:
@Bean
public BiFunction<KStream<String, MyClass>, KStream<String, String>, KStream<String, MyClass>[]> myStream() {
return (inputStream1, intputStream2) -> {
intputStream2
.peek((k, v) -> {
log.debug(...);
});
return inputStream1
.mapValues(...)
.branch((k,v) -> true, (k,v) -> true);
};
}
相关属性:
spring.cloud.stream.function.definition: ...;myStream
spring.cloud.stream.bindings:
myStream-in-0:
destination: inp0
myStream-in-1:
destination: inp1
myStream-out-0:
destination: out0
myStream-out-1:
destination: out1
Spring Cloud Kafka Stream 版本 Hoxton.SR4 (spring-cloud-stream-binder-kafka-streams:jar:3.0.4.RELEASE),内嵌 Kafka 版本 2.5。 0.
我正在使用嵌入式 Kafka 测试我的拓扑:
@RunWith(SpringRunner.class)
@SpringBootTest(
properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
topics = {
"inp0", "inp1", "out0", "out1"
},
brokerPropertiesLocation = "kafka.properties"
)
@Slf4j
public class MyApplicationTests {
@Test
public void embeddedKafkaTest() throws IOException, InterruptedException {
Consumer<String, MyClass> out0Consumer = createConsumer("out0ConsumerGroup");
Consumer<String, MyClass> out1Consumer = createConsumer("out1ConsumerGroup");
this.embeddedKafka.consumeFromAnEmbeddedTopic(out0Consumer, "out0");
this.embeddedKafka.consumeFromAnEmbeddedTopic(out1Consumer, "out1");
latch = new CountDownLatch(1);
// ... publish ...
latch.await(15, TimeUnit.SECONDS);
ConsumerRecords<String, MyClass> out0 = KafkaTestUtils.getRecords(out0Consumer);
assertThat(out0.count(), is(greaterThanOrEqualTo(1)));
ConsumerRecords<String, MyClass> out1 = KafkaTestUtils.getRecords(out1Consumer);
assertThat(out1.count(), is(greaterThanOrEqualTo(1)));
}
private <K,V> Consumer<K, V> createConsumer(String groupName) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(groupName, "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<K, V>(consumerProps).createConsumer();
}
我的测试表明,来自 myStream
的消息按预期到达并到达主题“out0”,但“out1”主题仍然为空,并且单元测试在第二个断言中失败。
我已经尝试了几件事,但看起来好像根本就没有生成到第二个输出主题的输出(到第一个输出主题的输出生成良好)。
你能看出我的设置有什么错误吗?
还有一件事:myStream bean 方法定义中的 return 语句显示编译器警告:
Unchecked generics array creation for varargs parameter
但看起来 Spring Cloud Kafka Stream 3.x API 需要定义 return 类型?
您将两个谓词传递给 branch
方法,并且它们的计算结果始终为 true
。第一个谓词总是获胜并为第一个输出绑定生成数据。分支方法调用在第一个谓词评估为真后退出。有关详细信息,请参阅 javadoc。您应该使用不同的谓词(可能检查 key/value 上的某些条件)。如果第一个谓词失败而第二个谓词成功,那么您将看到为第二个输出主题生成的数据。
关于那个编译器警告,我认为您可以安全地忽略它,因为 API 将确保传递给 branch
调用的谓词对象具有正确的类型。由于该方法的实现使用通用可变参数,因此您会遇到该异常。有关该编译器警告的详细信息,请参阅 this 线程。