Kafka Streams 不触发加入流的输出?

Kafka Streams not triggering output for joined streams?

我有来自 3 mysql table 的原始流,1 个主流和两个 child table。我试图加入三个原始流并转换为单个输出流。如果 parent 流上有任何更新,它会起作用,但如果 child 流上有任何变化则不会触发输出。

    @StreamListener
    public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
    @Input KStream<Long, Child1> child1Stream,
    @Input KStream<Long, Child2> child2Stream) {

    KTable<Long, Parent> parentTable = convertParent(parentStream);
    KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
    KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);

    parentTable
           .leftJoin(child1Table, (parent, child1List) -> new Output(k, v))
           .leftJoin(child2Table, (output, child2List) -> output.setChild2List(child2List))
           .toStream()
        }

parent 流上的任何新添加或更新都会被处理器拾取,并将其与其他 KTable 和 return 连接到输出流。但是 child1stream 或 child2stream 上的任何添加或更新都不会触发输出流。

我认为将所有输入流设为 KTable,它们将始终存储更改,因为它们都具有相同的键,并且将选择 parent 或 child table 上的任何更新连接起来。但它没有发生,任何人都可以建议我在这方面缺少什么吗?

我已经尝试了 KStream-KStream、Stream-KTable、KTable-KTable 连接,其中 none 在 child 更新的情况下有效。

-谢谢

请注意您的子表是如何从与父表相同的流中创建的:

KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);

不确定 convertChild1 和 convertChild2 方法的作用,但难道不应该分别给它们 child1Stream 和 child2Stream 作为参数吗?

你能展示你在哪里拥有 EnableBinding 和你绑定的处理器接口吗?

我觉得这不对:

@StreamListener
    public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
    @Input KStream<Long, Child1> child1Stream,
    @Input KStream<Long, Child2> child2Stream) {

您没有在输入上指定绑定。当你有多个输入时,你需要有这样的东西:

@StreamListener
        public Stream<Long, Output> handleStreams(@Input("input1") KStream<Long, Parent> parentStream,
        @Input("input2") KStream<Long, Child1> child1Stream,
        @Input("input3") KStream<Long, Child2> child2Stream) {

这些输入中的每一个都需要在处理器接口中定义。示例见此处:https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/kafka-streams-samples/kafka-streams-table-join/src/main/java/kafka/streams/table/join/KafkaStreamsTableJoin.java#L46