Kafka Ktable 还流式传输重复更新

Kafka Ktable also streaming duplicate updates

Kafka Ktable 也流式传输重复更新。

我想处理 Ktable(使用 Kstream.reduce() 创建)更新日志流,即 Ktable 中键值的任何更改。但似乎即使将相同的键值对多次发送到 Ktable,它每次都发送到下游。仅当值发生变化时,我才需要在键的值中发送更新。

`

groupByKey(Grouped.with(new Serdes.LongSerde(),new Serdes.LongSerde())) 
                .reduce(new Reducer<Long>() {   
                    @Override
                    public Long apply(Long t1, Long t2) {
                        return t2;
                    }
                }).toStream().foreach((key, value) -> //for each update in ID, send update to the stream
        {

            sendUpdate(key); 
        });

`

这是 KTable#toStream() 的默认行为,它将变更日志主题转换为 KStream,因此 reduce 的下游运算符在上游 reduce 运算符每次收到消息时都会更新。

您可以使用 Processor API 存档您的愿望行为,在这种情况下我们使用 KStream.transfomerValues()。

首先注册一个 KeyValueStore 来存储你的最新值:

//you don't need to add number_store, if your KTable already materialized to number_store
streamsBuilder
        .addStateStore(Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("number_store"), Serdes.Long(), Serdes.Long()));

numberKStream
        .transformValues(ExtractIfValueChangedTransformer::new, "number_store")
        .filter((key, value) -> value != null)
        .foreach((key, value) -> sendUpdate(key));

然后我们创建一个ExtractIfValueChangedTransformer,只有return新消息的值如果值已经改变,如果没有那么return null:

public class ExtractIfValueChangedTransformer implements ValueTransformerWithKey<Long, Long, Long> {

    KeyValueStore<Long, Long> kvStore;

    @Override
    public void init(ProcessorContext context) {
        kvStore = (KeyValueStore<Long, Long>) context.getStateStore("number_store");
    }

    @Override
    public Long transform(Long key, Long newValue) {
        Long oldValue = kvStore.get(key);
        kvStore.put(key, newValue);
        if (oldValue == null) return newValue;
        return oldValue.equals(newValue) ? null : newValue;
    }

    @Override
    public void close() {}
}

Kafka Streams 提供 2 种语义:emit-on-update 和 emit-on-window-close。

KIP-557 is about adding emit-on-change semantic based on byte array comparison of data. It has been implemented in Kafka Streams 2.6 and then removed due to "potential data loss".

尽管如此,我已经通过使用 Kafka Streams DSL 开发了 emit-on-change 语义的实现。

想法是将具有更新时发出语义的 KStream 转换为具有更改时发出语义的 KStream。您可以在您提供的源 Kstream 上使用此实现来创建 KTable,或者在应用 .toStream().

后在 KTable 上使用

此实现隐式创建了一个状态存储,其中值包含 KStream 数据和一个标志,指示是否应发出更新。这个标志是在聚合操作中设置的,是根据Object#equals进行比较的。但是您可以更改实现以使用 Comparator.

这里是 withEmitOnChange 改变 KStream 语义的方法。您可能必须为 EmitOnChangeState 数据结构指定一个 serde(见下文)。

public static <K, V> KStream<K, V> withEmitOnChange(KStream<K, V> streams) {
    return streams
            .groupByKey()
            .aggregate(
                    () -> (EmitOnChangeState<V>) null,
                    (k, data, state) -> {
                        if (state == null) {
                            return new EmitOnChangeState<>(data, true);
                        } else {
                            return state.merge(data);
                        }
                    }
            )
            .toStream()
            .filter((k, state) -> state.shouldEmit)
            .mapValues(state -> (V) state.data);
}

这是存储在状态存储中的数据结构,用于检查是否应发出更新。

public static class EmitOnChangeState<T> {
    public final T data;
    public final boolean shouldEmit;
    public EmitOnChangeState(T data, boolean shouldEmit) {
        this.data = data;
        this.shouldEmit = shouldEmit;
    }
    public EmitOnChangeState<T> merge(T newData) {
        return new EmitOnChangeState<>(newData, Objects.equals(data, newData));
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        EmitOnChangeState<?> that = (EmitOnChangeState<?>) o;
        return shouldEmit == that.shouldEmit && Objects.equals(data, that.data);
    }
    @Override
    public int hashCode() {
        return Objects.hash(data, shouldEmit);
    }
}

用法:

KStream<ProductKey, Product> products = builder.stream("product-topic");

withEmitOnChange(products)
  .to("out-product-topic"); // output topic with emit-on-change semantic