Kafka Streams 减少与抑制

Kafka Streams Reduce vs Suppress

在阅读 suppress() 文档时,我看到除非记录发布到主题,否则时间 window 不会提前,因为它基于事件时间。现在,我的代码正在输出每个键的最终值,因为该主题的流量是恒定的,但是当该系统关闭时会出现停机时间,导致状态存储中的现有记录被“冻结”。我想知道只使用 reduce() 而不是 reduce().suppress() 之间有什么区别。 reduce() 是否像 suppress() 一样,因为它们都是事件时间驱动的?我的理解是两者都在做同样的事情,聚合一定时间内的keywindow.

我的拓扑如下:

    final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
    final Serde<EligibilityKey> keySpecificAvroSerde = new SpecificAvroSerde<EligibilityKey>();
    keySpecificAvroSerde.configure(serdeConfig, true);
    final Serde<Eligibility> valueSpecificAvroSerde = new SpecificAvroSerde<Eligibility>();
    valueSpecificAvroSerde.configure(serdeConfig, false);

    // KStream<EligibilityKey, Eligibility>
    KStream<EligibilityKey, Eligibility> kStreamInput = builder.stream(input,
            Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde));

    // KStream<EligibilityKey, String>
    KStream<EligibilityKey, String> kStreamMapValues = kStreamInput
            .mapValues((key, value) -> Processor.process(key, value));

    // WindowBytesStoreSupplier
    WindowBytesStoreSupplier windowBytesStoreSupplier = Stores.inMemoryWindowStore("in-mem",
            Duration.ofSeconds(retentionPeriod), Duration.ofSeconds(windowSize), false);

    // Materialized
    Materialized<EligibilityKey, String, WindowStore<Bytes, byte[]>> materialized = Materialized
            .as(windowBytesStoreSupplier);
    materialized = Materialized.with(keySpecificAvroSerde, Serdes.String());

    // TimeWindows
    TimeWindows timeWindows = TimeWindows.of(Duration.ofSeconds(size)).advanceBy(Duration.ofSeconds(advance))
            .grace(Duration.ofSeconds(afterWindowEnd));

    // KTable<Windowed<EligibilityKey>, String>
    KTable<Windowed<EligibilityKey>, String> kTable = kStreamMapValues
            .groupByKey(Grouped.with(keySpecificAvroSerde, Serdes.String())).windowedBy(timeWindows)
            .reduce((a, b) -> b, materialized.withLoggingDisabled().withRetention(Duration.ofSeconds(retention)))
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withLoggingDisabled()));

    // KStream<Windowed<EligibilityKey>, String>
    KStream<Windowed<EligibilityKey>, String> kStreamOutput = kTable.toStream();

通过使用 reduce() 而不使用抑制,聚合的结果会不断更新,即,对包含 reduce() 结果的 KTable 的更新也在所有记录之前发送到下游window 已处理。

假设一个 reduce 只是将持续时间 3 的 window 中的值与宽限度 0 和以下输入记录(键、值、时间戳)相加到 reduce():

  • W1的输入记录(A,1,1)->输出记录((W1,A),1)被发送到下游
  • W1的输入记录(A,2,2)->输出记录((W1,A),3)发送到下游
  • W1的输入记录(A,3,3)->输出记录((W1,A),6)发送到下游
  • W2的输入记录(A,4,4)->输出记录((W2,A),4)发送到下游

使用 reduce().suppress(),结果会被缓冲,直到 window 关闭。结果将是:

  • W1的输入记录(A,1,1)->无输出
  • W1的输入记录(A,2,2)->无输出
  • W1的输入记录(A,3,3)->无输出
  • W2的输入记录(A,4,4)->输出记录((W1,A),6)发送到下游

请注意,对于没有 suppress() 的情况,我假设使用 cache.max.bytes.buffering = 0 关闭了缓存。使用 cache.max.bytes.buffering > 0(默认为 10MB),缓存将缓冲 KTable 的输出记录,一旦缓存已满,它将输出具有最近最少更新的键的记录。