Kafka Streams - 创建窗口状态存储
Kafka Streams - Creating Windowed State Store
下面的代码 "works" 但我对 Stores.persistentWindowStore() 中传递的值的含义感到困惑。我找到了文档 (https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore-java.lang.String-long-int-long-boolean-),但我不清楚 args 的定义。
windowBy() 值是否应始终与 persistentWindowStore() 中的 windowSize 匹配?
保留期限应设置为多少?源主题的保留策略?
段数有什么作用?
保留重复项有什么用?文档似乎指示将连接设置为 true?
long windowSize = TimeUnit.MINUTES.toMillis(15);
long retentionPeriod = windowSize*4*6 //6 hours
int numSegments = 2;
boolean retainDuplicates = false;
bdrStream.groupByKey().windowedBy(TimeWindows.of(windowSize))
.aggregate(() -> Lists.newArrayList(),
(aggKey, newValue, aggValue) -> {
BdrData d = new BdrData();
d.setCharge(newValue.getBdr().getCost());
aggValue.add(d);
return aggValue;
},
Materialized.<String, ArrayList<BdrData>>as(
Stores.persistentWindowStore("store5",
retentionPeriod,
numSegments,
windowSize,
retainDuplicates))
.withKeySerde(Serdes.String())
.withValueSerde(listBdrDataSerde))
.toStream()
.process(() -> new WindowAggregatorProcessor());
Should the windowBy() value always match windowSize in persistentWindowStore()?
是的。
What should the retention period be set to? The retention policy of the source topic?
它应该匹配 windows 的保留期,您可以通过 Windows#until()
指定(默认为 1 天)
What do the number of segments do?
段数决定coarse/fine粒度数据(即旧windows)如何过期。分段大小将为 "retention-period / (#segments + 1)"。请注意,更多段会为您提供更细粒度的数据过期,但会增加开销(每个段使用它自己的 RocksDB 实例)
What are retaining duplicates for? Document seems to indicate to set to true for joins?
默认情况下,键必须是唯一的。如果启用保留重复项,则可以多次存储相同的密钥。启用重复会降低性能。
注:
API 的这一部分在即将发布的 2.1 版本中进行了重新设计和简化。比较 KIP-319 and KIP-328 了解详情。
下面的代码 "works" 但我对 Stores.persistentWindowStore() 中传递的值的含义感到困惑。我找到了文档 (https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore-java.lang.String-long-int-long-boolean-),但我不清楚 args 的定义。
windowBy() 值是否应始终与 persistentWindowStore() 中的 windowSize 匹配?
保留期限应设置为多少?源主题的保留策略?
段数有什么作用?
保留重复项有什么用?文档似乎指示将连接设置为 true?
long windowSize = TimeUnit.MINUTES.toMillis(15);
long retentionPeriod = windowSize*4*6 //6 hours
int numSegments = 2;
boolean retainDuplicates = false;
bdrStream.groupByKey().windowedBy(TimeWindows.of(windowSize))
.aggregate(() -> Lists.newArrayList(),
(aggKey, newValue, aggValue) -> {
BdrData d = new BdrData();
d.setCharge(newValue.getBdr().getCost());
aggValue.add(d);
return aggValue;
},
Materialized.<String, ArrayList<BdrData>>as(
Stores.persistentWindowStore("store5",
retentionPeriod,
numSegments,
windowSize,
retainDuplicates))
.withKeySerde(Serdes.String())
.withValueSerde(listBdrDataSerde))
.toStream()
.process(() -> new WindowAggregatorProcessor());
Should the windowBy() value always match windowSize in persistentWindowStore()?
是的。
What should the retention period be set to? The retention policy of the source topic?
它应该匹配 windows 的保留期,您可以通过 Windows#until()
指定(默认为 1 天)
What do the number of segments do?
段数决定coarse/fine粒度数据(即旧windows)如何过期。分段大小将为 "retention-period / (#segments + 1)"。请注意,更多段会为您提供更细粒度的数据过期,但会增加开销(每个段使用它自己的 RocksDB 实例)
What are retaining duplicates for? Document seems to indicate to set to true for joins?
默认情况下,键必须是唯一的。如果启用保留重复项,则可以多次存储相同的密钥。启用重复会降低性能。
注:
API 的这一部分在即将发布的 2.1 版本中进行了重新设计和简化。比较 KIP-319 and KIP-328 了解详情。