Kafka Streams 不触发加入流的输出?
Kafka Streams not triggering output for joined streams?
我有来自 3 mysql table 的原始流,1 个主流和两个 child table。我试图加入三个原始流并转换为单个输出流。如果 parent 流上有任何更新,它会起作用,但如果 child 流上有任何变化则不会触发输出。
@StreamListener
public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
@Input KStream<Long, Child1> child1Stream,
@Input KStream<Long, Child2> child2Stream) {
KTable<Long, Parent> parentTable = convertParent(parentStream);
KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);
parentTable
.leftJoin(child1Table, (parent, child1List) -> new Output(k, v))
.leftJoin(child2Table, (output, child2List) -> output.setChild2List(child2List))
.toStream()
}
parent 流上的任何新添加或更新都会被处理器拾取,并将其与其他 KTable 和 return 连接到输出流。但是 child1stream 或 child2stream 上的任何添加或更新都不会触发输出流。
我认为将所有输入流设为 KTable,它们将始终存储更改,因为它们都具有相同的键,并且将选择 parent 或 child table 上的任何更新连接起来。但它没有发生,任何人都可以建议我在这方面缺少什么吗?
我已经尝试了 KStream-KStream、Stream-KTable、KTable-KTable 连接,其中 none 在 child 更新的情况下有效。
-谢谢
请注意您的子表是如何从与父表相同的流中创建的:
KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);
不确定 convertChild1 和 convertChild2 方法的作用,但难道不应该分别给它们 child1Stream 和 child2Stream 作为参数吗?
你能展示你在哪里拥有 EnableBinding
和你绑定的处理器接口吗?
我觉得这不对:
@StreamListener
public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
@Input KStream<Long, Child1> child1Stream,
@Input KStream<Long, Child2> child2Stream) {
您没有在输入上指定绑定。当你有多个输入时,你需要有这样的东西:
@StreamListener
public Stream<Long, Output> handleStreams(@Input("input1") KStream<Long, Parent> parentStream,
@Input("input2") KStream<Long, Child1> child1Stream,
@Input("input3") KStream<Long, Child2> child2Stream) {
这些输入中的每一个都需要在处理器接口中定义。示例见此处:https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/kafka-streams-samples/kafka-streams-table-join/src/main/java/kafka/streams/table/join/KafkaStreamsTableJoin.java#L46
我有来自 3 mysql table 的原始流,1 个主流和两个 child table。我试图加入三个原始流并转换为单个输出流。如果 parent 流上有任何更新,它会起作用,但如果 child 流上有任何变化则不会触发输出。
@StreamListener
public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
@Input KStream<Long, Child1> child1Stream,
@Input KStream<Long, Child2> child2Stream) {
KTable<Long, Parent> parentTable = convertParent(parentStream);
KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);
parentTable
.leftJoin(child1Table, (parent, child1List) -> new Output(k, v))
.leftJoin(child2Table, (output, child2List) -> output.setChild2List(child2List))
.toStream()
}
parent 流上的任何新添加或更新都会被处理器拾取,并将其与其他 KTable 和 return 连接到输出流。但是 child1stream 或 child2stream 上的任何添加或更新都不会触发输出流。
我认为将所有输入流设为 KTable,它们将始终存储更改,因为它们都具有相同的键,并且将选择 parent 或 child table 上的任何更新连接起来。但它没有发生,任何人都可以建议我在这方面缺少什么吗?
我已经尝试了 KStream-KStream、Stream-KTable、KTable-KTable 连接,其中 none 在 child 更新的情况下有效。
-谢谢
请注意您的子表是如何从与父表相同的流中创建的:
KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);
不确定 convertChild1 和 convertChild2 方法的作用,但难道不应该分别给它们 child1Stream 和 child2Stream 作为参数吗?
你能展示你在哪里拥有 EnableBinding
和你绑定的处理器接口吗?
我觉得这不对:
@StreamListener
public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
@Input KStream<Long, Child1> child1Stream,
@Input KStream<Long, Child2> child2Stream) {
您没有在输入上指定绑定。当你有多个输入时,你需要有这样的东西:
@StreamListener
public Stream<Long, Output> handleStreams(@Input("input1") KStream<Long, Parent> parentStream,
@Input("input2") KStream<Long, Child1> child1Stream,
@Input("input3") KStream<Long, Child2> child2Stream) {
这些输入中的每一个都需要在处理器接口中定义。示例见此处:https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/kafka-streams-samples/kafka-streams-table-join/src/main/java/kafka/streams/table/join/KafkaStreamsTableJoin.java#L46