KStream - KTable 加入不触发

KStream - KTable Join not triggering

我有 2 个主题(实际上更多,但这里保持简单),我使用 Streams DSL 加入,加入后,将数据发布到下游。

I am creating a KTable on top of Topic 1 and storing it into a named state store. Key for Topic1 looks like below:

{  sourceCode:"WXYZ",
    platformCode:"ABCD",
    transactionIdentifier:"012345:01:55555:12345000:1"
}

我在更改日志主题中看到了预期的数据。

There is a KStream on top of topic 2. Key for Topic2 looks like below:

{  sourceCode:"WXYZ",
   platformCode:"ABCD",
   transactionIdentifier:"012345:01:55555:12345000:1"
   lineIdentifier:"1"
}

由于主题 1 和主题 2 中的数据之间存在一对多关系,因此我正在重新加密和聚合来自主题 2 的数据并将其放入另一个命名的状态存储中。 重新键入数据后,主题 2 中的键看起来与主题 1 中的键相同。我可以看到重新分区主题中重新键入的数据以及变更日志主题中的聚合数据,如预期的那样。但是,连接没有被触发。

其他关键细节 –

  1. 所有主题中的数据均采用 Avro 格式。
  2. 我正在使用 Java/Spring 启动。
  3. 我在 commit.interval.ms[= 上保留了默认设置51=]

有什么地方我可能做错了什么吗?

编辑 1:我查看了数据分区,看起来一个在 14 上,另一个在 20 上。我还发现了一个 similar question

编辑 2:topic1 和 topic2 的生产者是一个 golang 应用程序。 流还原消费者具有以下配置:

partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]

流消费者具有以下配置:

partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]

我在下面发布答案,以帮助其他人从此类问题中寻找必杀技。正如链接问题的评论部分所指出的,这是一个由生产者应用程序引起的问题。

Producer application is written in golang and hence, its hashing is different than Java, which is what I am using for joining data using Streams DSL.

早些时候,这就是我阅读 KTable 的方式,它保持与源主题中相同的分区:

@Bean
public KTable<MyKey, MyValue> myKTable(StreamsBuilder streamsBuilder) {
    return streamsBuilder.table(inputTopic1, Materialized.as(transactionStore));
}

我重写了如下代码,以达到预期的效果:

@Bean
public KTable<MyKey, MyValue> myKTable(StreamsBuilder streamsBuilder) {

    SpecificAvroSerde<MyKey> keySpecificAvroSerde = myKeySpecificAvroSerde();
    SpecificAvroSerde<MyValue> valueSpecificAvroSerde = mySpecificAvroSerde();

    streamsBuilder.stream(inputTopic1, Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde)).
            selectKey((key, value) -> new MyKey(key.get1(), key.get2(), key.get3())).
        to("dummyTopic", Produced.with(keySpecificAvroSerde, valueSpecificAvroSerde));

    return streamsBuilder.table("dummyTopic",
            Materialized.<MyKey, MyValue, KeyValueStore<Bytes, byte[]>>as("myStateStore").
                   withKeySerde(keySpecificAvroSerde).withValueSerde(valueSpecificAvroSerde));
}