Spring Cloud Stream Kafka Streams Binder 3.x:在多个输出绑定的情况下没有输出到第二个输出主题

Spring Cloud Stream Kafka Streams Binder 3.x: No output to the second output topic in case of multiple output bindings

我有以下处理器 bean 方法签名:

@Bean
public BiFunction<KStream<String, MyClass>, KStream<String, String>, KStream<String, MyClass>[]> myStream() {
    return (inputStream1, intputStream2) -> {

        intputStream2
            .peek((k, v) -> {
                log.debug(...);
            });

        return inputStream1
            .mapValues(...)
            .branch((k,v) -> true, (k,v) -> true);

    };
}

相关属性:

spring.cloud.stream.function.definition: ...;myStream
spring.cloud.stream.bindings:
  myStream-in-0:
    destination: inp0
  myStream-in-1:
    destination: inp1
  myStream-out-0:
    destination: out0
  myStream-out-1:
    destination: out1

Spring Cloud Kafka Stream 版本 Hoxton.SR4 (spring-cloud-stream-binder-kafka-streams:jar:3.0.4.RELEASE),内嵌 Kafka 版本 2.5。 0.

我正在使用嵌入式 Kafka 测试我的拓扑:

@RunWith(SpringRunner.class)
@SpringBootTest(
    properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
        topics = {
            "inp0", "inp1", "out0", "out1"
        },
        brokerPropertiesLocation = "kafka.properties"
)
@Slf4j
public class MyApplicationTests {

    @Test
    public void embeddedKafkaTest() throws IOException, InterruptedException {
        Consumer<String, MyClass> out0Consumer = createConsumer("out0ConsumerGroup");
        Consumer<String, MyClass> out1Consumer = createConsumer("out1ConsumerGroup");

        this.embeddedKafka.consumeFromAnEmbeddedTopic(out0Consumer, "out0");
        this.embeddedKafka.consumeFromAnEmbeddedTopic(out1Consumer, "out1");

        latch = new CountDownLatch(1);
        // ... publish ...
        latch.await(15, TimeUnit.SECONDS);

        ConsumerRecords<String, MyClass> out0 = KafkaTestUtils.getRecords(out0Consumer);
        assertThat(out0.count(), is(greaterThanOrEqualTo(1)));

        ConsumerRecords<String, MyClass> out1 = KafkaTestUtils.getRecords(out1Consumer);
        assertThat(out1.count(), is(greaterThanOrEqualTo(1)));

    }

private <K,V> Consumer<K, V> createConsumer(String groupName) {
    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(groupName, "true", this.embeddedKafka);
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new DefaultKafkaConsumerFactory<K, V>(consumerProps).createConsumer();
}

我的测试表明,来自 myStream 的消息按预期到达并到达主题“out0”,但“out1”主题仍然为空,并且单元测试在第二个断言中失败。

我已经尝试了几件事,但看起来好像根本就没有生成到第二个输出主题的输出(到第一个输出主题的输出生成良好)。

你能看出我的设置有什么错误吗?

还有一件事:myStream bean 方法定义中的 return 语句显示编译器警告:

Unchecked generics array creation for varargs parameter

但看起来 Spring Cloud Kafka Stream 3.x API 需要定义 return 类型?

您将两个谓词传递给 branch 方法,并且它们的计算结果始终为 true。第一个谓词总是获胜并为第一个输出绑定生成数据。分支方法调用在第一个谓词评估为真后退出。有关详细信息,请参阅 javadoc。您应该使用不同的谓词(可能检查 key/value 上的某些条件)。如果第一个谓词失败而第二个谓词成功,那么您将看到为第二个输出主题生成的数据。

关于那个编译器警告,我认为您可以安全地忽略它,因为 API 将确保传递给 branch 调用的谓词对象具有正确的类型。由于该方法的实现使用通用可变参数,因此您会遇到该异常。有关该编译器警告的详细信息,请参阅 this 线程。