墓碑消息未从 KTable 状态存储中删除记录?
Tombstone messages not removing record from KTable state store?
我正在创建 KTable 处理来自 KStream 的数据。但是,当我触发带有密钥和空负载的逻辑删除消息时,它不会从 KTable 中删除消息。
样本-
public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
.map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
.groupByKey()
reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));
GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);
触发带有空值的消息后,我可以在带有空负载的 testStream 映射函数中进行调试,但它不会删除 KTable 更改日志中的记录 "test-store"。看起来它甚至没有达到 reduce 方法,不确定我在这里遗漏了什么。
感谢任何帮助!
谢谢。
如 reduce()
的 JavaDocs 中所述
Records with {@code null} key or value are ignored.
因为 <key,null>
记录被删除,因此 (genericRecord, v1) -> v1
永远不会执行,没有墓碑被写入存储或更新日志主题。
对于您想到的用例,您需要使用指示 "delete" 的替代值,例如 Avro 记录中的布尔标志。您的 reduce 函数需要检查标志和 return null
是否设置了标志;否则,它必须定期处理记录。
更新:
Apache Kafka 2.6 添加了 KStream#toTable()
运算符(通过 KIP-523),允许将 KStream
转换为 KTable
。
Matthias 对上述回答的补充:
Reduce 忽略流中的第一条记录,因此映射和分组的值将按原样存储在 KTable 中,永远不会通过 reduce 方法进行逻辑删除。这意味着不可能仅在 table 上加入另一个流,该值本身也需要评估。
希望 KIP-523 解决这个问题。
我正在创建 KTable 处理来自 KStream 的数据。但是,当我触发带有密钥和空负载的逻辑删除消息时,它不会从 KTable 中删除消息。
样本-
public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
.map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
.groupByKey()
reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));
GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);
触发带有空值的消息后,我可以在带有空负载的 testStream 映射函数中进行调试,但它不会删除 KTable 更改日志中的记录 "test-store"。看起来它甚至没有达到 reduce 方法,不确定我在这里遗漏了什么。
感谢任何帮助!
谢谢。
如 reduce()
Records with {@code null} key or value are ignored.
因为 <key,null>
记录被删除,因此 (genericRecord, v1) -> v1
永远不会执行,没有墓碑被写入存储或更新日志主题。
对于您想到的用例,您需要使用指示 "delete" 的替代值,例如 Avro 记录中的布尔标志。您的 reduce 函数需要检查标志和 return null
是否设置了标志;否则,它必须定期处理记录。
更新:
Apache Kafka 2.6 添加了 KStream#toTable()
运算符(通过 KIP-523),允许将 KStream
转换为 KTable
。
Matthias 对上述回答的补充:
Reduce 忽略流中的第一条记录,因此映射和分组的值将按原样存储在 KTable 中,永远不会通过 reduce 方法进行逻辑删除。这意味着不可能仅在 table 上加入另一个流,该值本身也需要评估。
希望 KIP-523 解决这个问题。