Kafka Streams 减少与抑制
Kafka Streams Reduce vs Suppress
在阅读 suppress() 文档时,我看到除非记录发布到主题,否则时间 window 不会提前,因为它基于事件时间。现在,我的代码正在输出每个键的最终值,因为该主题的流量是恒定的,但是当该系统关闭时会出现停机时间,导致状态存储中的现有记录被“冻结”。我想知道只使用 reduce() 而不是 reduce().suppress() 之间有什么区别。 reduce() 是否像 suppress() 一样,因为它们都是事件时间驱动的?我的理解是两者都在做同样的事情,聚合一定时间内的keywindow.
我的拓扑如下:
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
final Serde<EligibilityKey> keySpecificAvroSerde = new SpecificAvroSerde<EligibilityKey>();
keySpecificAvroSerde.configure(serdeConfig, true);
final Serde<Eligibility> valueSpecificAvroSerde = new SpecificAvroSerde<Eligibility>();
valueSpecificAvroSerde.configure(serdeConfig, false);
// KStream<EligibilityKey, Eligibility>
KStream<EligibilityKey, Eligibility> kStreamInput = builder.stream(input,
Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde));
// KStream<EligibilityKey, String>
KStream<EligibilityKey, String> kStreamMapValues = kStreamInput
.mapValues((key, value) -> Processor.process(key, value));
// WindowBytesStoreSupplier
WindowBytesStoreSupplier windowBytesStoreSupplier = Stores.inMemoryWindowStore("in-mem",
Duration.ofSeconds(retentionPeriod), Duration.ofSeconds(windowSize), false);
// Materialized
Materialized<EligibilityKey, String, WindowStore<Bytes, byte[]>> materialized = Materialized
.as(windowBytesStoreSupplier);
materialized = Materialized.with(keySpecificAvroSerde, Serdes.String());
// TimeWindows
TimeWindows timeWindows = TimeWindows.of(Duration.ofSeconds(size)).advanceBy(Duration.ofSeconds(advance))
.grace(Duration.ofSeconds(afterWindowEnd));
// KTable<Windowed<EligibilityKey>, String>
KTable<Windowed<EligibilityKey>, String> kTable = kStreamMapValues
.groupByKey(Grouped.with(keySpecificAvroSerde, Serdes.String())).windowedBy(timeWindows)
.reduce((a, b) -> b, materialized.withLoggingDisabled().withRetention(Duration.ofSeconds(retention)))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withLoggingDisabled()));
// KStream<Windowed<EligibilityKey>, String>
KStream<Windowed<EligibilityKey>, String> kStreamOutput = kTable.toStream();
通过使用 reduce()
而不使用抑制,聚合的结果会不断更新,即,对包含 reduce()
结果的 KTable 的更新也在所有记录之前发送到下游window 已处理。
假设一个 reduce 只是将持续时间 3 的 window 中的值与宽限度 0 和以下输入记录(键、值、时间戳)相加到 reduce()
:
- W1的输入记录(A,1,1)->输出记录((W1,A),1)被发送到下游
- W1的输入记录(A,2,2)->输出记录((W1,A),3)发送到下游
- W1的输入记录(A,3,3)->输出记录((W1,A),6)发送到下游
- W2的输入记录(A,4,4)->输出记录((W2,A),4)发送到下游
使用 reduce().suppress()
,结果会被缓冲,直到 window 关闭。结果将是:
- W1的输入记录(A,1,1)->无输出
- W1的输入记录(A,2,2)->无输出
- W1的输入记录(A,3,3)->无输出
- W2的输入记录(A,4,4)->输出记录((W1,A),6)发送到下游
请注意,对于没有 suppress()
的情况,我假设使用 cache.max.bytes.buffering = 0
关闭了缓存。使用 cache.max.bytes.buffering > 0
(默认为 10MB),缓存将缓冲 KTable 的输出记录,一旦缓存已满,它将输出具有最近最少更新的键的记录。
在阅读 suppress() 文档时,我看到除非记录发布到主题,否则时间 window 不会提前,因为它基于事件时间。现在,我的代码正在输出每个键的最终值,因为该主题的流量是恒定的,但是当该系统关闭时会出现停机时间,导致状态存储中的现有记录被“冻结”。我想知道只使用 reduce() 而不是 reduce().suppress() 之间有什么区别。 reduce() 是否像 suppress() 一样,因为它们都是事件时间驱动的?我的理解是两者都在做同样的事情,聚合一定时间内的keywindow.
我的拓扑如下:
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
final Serde<EligibilityKey> keySpecificAvroSerde = new SpecificAvroSerde<EligibilityKey>();
keySpecificAvroSerde.configure(serdeConfig, true);
final Serde<Eligibility> valueSpecificAvroSerde = new SpecificAvroSerde<Eligibility>();
valueSpecificAvroSerde.configure(serdeConfig, false);
// KStream<EligibilityKey, Eligibility>
KStream<EligibilityKey, Eligibility> kStreamInput = builder.stream(input,
Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde));
// KStream<EligibilityKey, String>
KStream<EligibilityKey, String> kStreamMapValues = kStreamInput
.mapValues((key, value) -> Processor.process(key, value));
// WindowBytesStoreSupplier
WindowBytesStoreSupplier windowBytesStoreSupplier = Stores.inMemoryWindowStore("in-mem",
Duration.ofSeconds(retentionPeriod), Duration.ofSeconds(windowSize), false);
// Materialized
Materialized<EligibilityKey, String, WindowStore<Bytes, byte[]>> materialized = Materialized
.as(windowBytesStoreSupplier);
materialized = Materialized.with(keySpecificAvroSerde, Serdes.String());
// TimeWindows
TimeWindows timeWindows = TimeWindows.of(Duration.ofSeconds(size)).advanceBy(Duration.ofSeconds(advance))
.grace(Duration.ofSeconds(afterWindowEnd));
// KTable<Windowed<EligibilityKey>, String>
KTable<Windowed<EligibilityKey>, String> kTable = kStreamMapValues
.groupByKey(Grouped.with(keySpecificAvroSerde, Serdes.String())).windowedBy(timeWindows)
.reduce((a, b) -> b, materialized.withLoggingDisabled().withRetention(Duration.ofSeconds(retention)))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withLoggingDisabled()));
// KStream<Windowed<EligibilityKey>, String>
KStream<Windowed<EligibilityKey>, String> kStreamOutput = kTable.toStream();
通过使用 reduce()
而不使用抑制,聚合的结果会不断更新,即,对包含 reduce()
结果的 KTable 的更新也在所有记录之前发送到下游window 已处理。
假设一个 reduce 只是将持续时间 3 的 window 中的值与宽限度 0 和以下输入记录(键、值、时间戳)相加到 reduce()
:
- W1的输入记录(A,1,1)->输出记录((W1,A),1)被发送到下游
- W1的输入记录(A,2,2)->输出记录((W1,A),3)发送到下游
- W1的输入记录(A,3,3)->输出记录((W1,A),6)发送到下游
- W2的输入记录(A,4,4)->输出记录((W2,A),4)发送到下游
使用 reduce().suppress()
,结果会被缓冲,直到 window 关闭。结果将是:
- W1的输入记录(A,1,1)->无输出
- W1的输入记录(A,2,2)->无输出
- W1的输入记录(A,3,3)->无输出
- W2的输入记录(A,4,4)->输出记录((W1,A),6)发送到下游
请注意,对于没有 suppress()
的情况,我假设使用 cache.max.bytes.buffering = 0
关闭了缓存。使用 cache.max.bytes.buffering > 0
(默认为 10MB),缓存将缓冲 KTable 的输出记录,一旦缓存已满,它将输出具有最近最少更新的键的记录。