Kafka 流 - 连接两个 ktables 调用连接函数两次
Kafka streams - joining two ktables invokes join function twice
我正在尝试加入 2 个 KTable。
KTable<String, RecordBean> recordsTable = builder.table(Serdes.String(),
new JsonPOJOSerde<>(RecordBean.class),
bidTopic, RECORDS_STORE);
KTable<String, ImpressionBean> impressionsTable = builder.table(Serdes.String(),
new JsonPOJOSerde<>(ImpressionBean.class),
impressionTopic, IMPRESSIONS_STORE);
KTable<String, RecordBean> mergedByTxId = recordsTable
.join(impressionsTable, merge());
合并函数非常简单,我只是将值从一个 bean 复制到另一个。
public static <K extends BidInfo, V extends BidInfo> ValueJoiner<K, V, K> merge() {
return (v1, v2) -> {
v1.setRtbWinningBidAmount(v2.getRtbWinningBidAmount());
return v1;
};
但由于某些原因,连接函数在单个生成的记录上调用了两次。
请参阅下面的 streaming/producer 配置
Properties streamsConfiguration = new Properties();
streamsConfiguration
.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-impressions");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zookeeperConnect());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, folder.newFolder("kafka-streams-tmp")
.getAbsolutePath());
return streamsConfiguration;
生产者配置 -
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return producerConfig;
接下来我要为每个流提交单个记录。两条记录具有相同的键。
我希望收到单条记录作为输出。
IntegrationTestUtils.produceKeyValuesSynchronously(bidsTopic,
Arrays.asList(new KeyValue("1", getRecordBean("1"))),
getProducerProperties());
IntegrationTestUtils.produceKeyValuesSynchronously(impressionTopic,
Arrays.asList(new KeyValue("1", getImpressionBean("1"))),
getProducerProperties());
List<KeyValue<String, String>> parsedRecord =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
outputTopic, 1);
但是 ValueJoiner 触发了 2 次,我得到了 2 条相同的输出记录,而不是一条。在触发时间内 - 来自两个流的两个值都存在 - 我无法获得触发第二次执行的内容。
不加入 - 我无法重现此行为。
我找不到 2 ktable join 的任何工作示例 - 所以无法理解我的方法有什么问题。
添加演示相同行为的简单代码
KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> first = builder.table("stream1", "storage1");
KTable<String, String> second = builder.table("stream2", "storage2");
KTable<String, String> joined = first.join(second, (value1, value2) -> value1);
joined.to("output");
KafkaStreams streams = new KafkaStreams(builder, getStreamingProperties());
streams.start();
IntegrationTestUtils.produceKeyValuesSynchronously("stream1",
Arrays.asList(new KeyValue("1", "first stream")),
getProducerProperties());
IntegrationTestUtils.produceKeyValuesSynchronously("stream2",
Arrays.asList(new KeyValue("1", "second stream")),
getProducerProperties());
List<KeyValue<String, String>> parsedRecord =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
"output", 1);
我在向 Confluent 邮件组发布类似问题后得到了以下解释。
I think this might be related to caching. The caches for the 2 tables are flushed independently, so there is a chance you will get the same record twice. If stream1 and stream2 both receive a record for the same key, and the cache flushes, then:
The cache from stream1 will flush, perform the join, and produce a record.
The cache from stream2 will flush, perform the join, and produce a record.
Technically this is ok as the result of the join is another KTable, so the value in the KTable will be the correct value.
将以下变量设置为 0 StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG 后,0 - 问题已解决。
我仍然有 2 条记录 - 但现在一条记录与 null 连接 - 根据上面提供的连接语义文档,它的行为非常清晰。
我在两个 KTable 之间使用 leftJoin 发现了相同的行为,并且在谷歌搜索后偶然发现了这个 post。我不知道你使用的是什么版本的 kafka-streams,但是在调试了融合代码之后,kafka-streams 版本 2.0.1 似乎故意在某些类型的连接中发送旧值和新值,所以你得到两次调用 ValueJoiner。
看看构建连接拓扑的 org.apache.kafka.streams.kstream.internals.KTableImpl#buildJoin
的实现,以及在运行时调度它的 org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin.KTableKTableRightJoinProcessor#process
的实现。在某些情况下显然做了两次。
这是此行为的一些背景信息https://issues.apache.org/jira/browse/KAFKA-2984
我正在尝试加入 2 个 KTable。
KTable<String, RecordBean> recordsTable = builder.table(Serdes.String(),
new JsonPOJOSerde<>(RecordBean.class),
bidTopic, RECORDS_STORE);
KTable<String, ImpressionBean> impressionsTable = builder.table(Serdes.String(),
new JsonPOJOSerde<>(ImpressionBean.class),
impressionTopic, IMPRESSIONS_STORE);
KTable<String, RecordBean> mergedByTxId = recordsTable
.join(impressionsTable, merge());
合并函数非常简单,我只是将值从一个 bean 复制到另一个。
public static <K extends BidInfo, V extends BidInfo> ValueJoiner<K, V, K> merge() {
return (v1, v2) -> {
v1.setRtbWinningBidAmount(v2.getRtbWinningBidAmount());
return v1;
};
但由于某些原因,连接函数在单个生成的记录上调用了两次。 请参阅下面的 streaming/producer 配置
Properties streamsConfiguration = new Properties();
streamsConfiguration
.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-impressions");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zookeeperConnect());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, folder.newFolder("kafka-streams-tmp")
.getAbsolutePath());
return streamsConfiguration;
生产者配置 -
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return producerConfig;
接下来我要为每个流提交单个记录。两条记录具有相同的键。 我希望收到单条记录作为输出。
IntegrationTestUtils.produceKeyValuesSynchronously(bidsTopic,
Arrays.asList(new KeyValue("1", getRecordBean("1"))),
getProducerProperties());
IntegrationTestUtils.produceKeyValuesSynchronously(impressionTopic,
Arrays.asList(new KeyValue("1", getImpressionBean("1"))),
getProducerProperties());
List<KeyValue<String, String>> parsedRecord =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
outputTopic, 1);
但是 ValueJoiner 触发了 2 次,我得到了 2 条相同的输出记录,而不是一条。在触发时间内 - 来自两个流的两个值都存在 - 我无法获得触发第二次执行的内容。
不加入 - 我无法重现此行为。 我找不到 2 ktable join 的任何工作示例 - 所以无法理解我的方法有什么问题。
添加演示相同行为的简单代码
KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> first = builder.table("stream1", "storage1");
KTable<String, String> second = builder.table("stream2", "storage2");
KTable<String, String> joined = first.join(second, (value1, value2) -> value1);
joined.to("output");
KafkaStreams streams = new KafkaStreams(builder, getStreamingProperties());
streams.start();
IntegrationTestUtils.produceKeyValuesSynchronously("stream1",
Arrays.asList(new KeyValue("1", "first stream")),
getProducerProperties());
IntegrationTestUtils.produceKeyValuesSynchronously("stream2",
Arrays.asList(new KeyValue("1", "second stream")),
getProducerProperties());
List<KeyValue<String, String>> parsedRecord =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
"output", 1);
我在向 Confluent 邮件组发布类似问题后得到了以下解释。
I think this might be related to caching. The caches for the 2 tables are flushed independently, so there is a chance you will get the same record twice. If stream1 and stream2 both receive a record for the same key, and the cache flushes, then:
The cache from stream1 will flush, perform the join, and produce a record.
The cache from stream2 will flush, perform the join, and produce a record.
Technically this is ok as the result of the join is another KTable, so the value in the KTable will be the correct value.
将以下变量设置为 0 StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG 后,0 - 问题已解决。 我仍然有 2 条记录 - 但现在一条记录与 null 连接 - 根据上面提供的连接语义文档,它的行为非常清晰。
我在两个 KTable 之间使用 leftJoin 发现了相同的行为,并且在谷歌搜索后偶然发现了这个 post。我不知道你使用的是什么版本的 kafka-streams,但是在调试了融合代码之后,kafka-streams 版本 2.0.1 似乎故意在某些类型的连接中发送旧值和新值,所以你得到两次调用 ValueJoiner。
看看构建连接拓扑的 org.apache.kafka.streams.kstream.internals.KTableImpl#buildJoin
的实现,以及在运行时调度它的 org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin.KTableKTableRightJoinProcessor#process
的实现。在某些情况下显然做了两次。
这是此行为的一些背景信息https://issues.apache.org/jira/browse/KAFKA-2984