Kafka Streams - 创建窗口状态存储

Kafka Streams - Creating Windowed State Store

下面的代码 "works" 但我对 Stores.persistentWindowStore() 中传递的值的含义感到困惑。我找到了文档 (https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore-java.lang.String-long-int-long-boolean-),但我不清楚 args 的定义。

windowBy() 值是否应始终与 persistentWindowStore() 中的 windowSize 匹配?

保留期限应设置为多少?源主题的保留策略?

段数有什么作用?

保留重复项有什么用?文档似乎指示将连接设置为 true?

long windowSize = TimeUnit.MINUTES.toMillis(15);
long retentionPeriod = windowSize*4*6 //6 hours
int numSegments = 2;
boolean retainDuplicates = false;

bdrStream.groupByKey().windowedBy(TimeWindows.of(windowSize))
    .aggregate(() -> Lists.newArrayList(),
        (aggKey, newValue, aggValue) -> {
            BdrData d = new BdrData();
            d.setCharge(newValue.getBdr().getCost());
            aggValue.add(d);
            return aggValue;
        },
        Materialized.<String, ArrayList<BdrData>>as(
            Stores.persistentWindowStore("store5", 
                retentionPeriod, 
                numSegments, 
                windowSize,
                retainDuplicates))
                .withKeySerde(Serdes.String())
                .withValueSerde(listBdrDataSerde))
    .toStream()
    .process(() -> new WindowAggregatorProcessor());

Should the windowBy() value always match windowSize in persistentWindowStore()?

是的。

What should the retention period be set to? The retention policy of the source topic?

它应该匹配 windows 的保留期,您可以通过 Windows#until() 指定(默认为 1 天)

What do the number of segments do?

段数决定coarse/fine粒度数据(即旧windows)如何过期。分段大小将为 "retention-period / (#segments + 1)"。请注意,更多段会为您提供更细粒度的数据过期,但会增加开销(每个段使用它自己的 RocksDB 实例)

What are retaining duplicates for? Document seems to indicate to set to true for joins?

默认情况下,键必须是唯一的。如果启用保留重复项,则可以多次存储相同的密钥。启用重复会降低性能。

注:

API 的这一部分在即将发布的 2.1 版本中进行了重新设计和简化。比较 KIP-319 and KIP-328 了解详情。