Kafka Streams 构建 StateStoreSupplier:API 澄清

Kafka Streams building StateStoreSupplier: API clarifications

我正在使用 0.11.0.2 版本的 Kafka Streams。

为了利用 transform API,我使用 Stores.create 构建器方法创建了自己的 StateStoreSupplier。问题是某些 field/methods 的 javadoc 对我来说不够清楚。

val storeSupplier = Stores.create(STORE_NAME)
            .withStringKeys()
            .withStringValues()
            .persistent()
            .disableLogging()
            .windowed(WINDOW_SIZE, RETENTION, 3, false)
            .enableCaching()
            .build()

如何表示提到的变更日志?

/**
* Indicates that a changelog should not be created for the key-value store
*/
PersistentKeyValueFactory<K, V> disableLogging();

这 4 个值如何相互影响?每个 window 都有最大数量的元素 - windowSize?一旦到达新 window 开始?并且每个 window 可以在 RocksDB 的磁盘上划分为 numSegments 个文件?重复意味着键和值都相同,并且仅在相同的 window?

中检测到
 /**
 * Set the persistent store as a windowed key-value store
 * @param windowSize size of the windows
 * @param retentionPeriod the maximum period of time in milli-second to keep each window in this store
 * @param numSegments the maximum number of segments for rolling the windowed store
 * @param retainDuplicates whether or not to retain duplicate data within the window
 */
PersistentKeyValueFactory<K, V> windowed(final long windowSize, long retentionPeriod, int numSegments, boolean retainDuplicates);

这里隐含了什么样的缓存?

/**
* Caching should be enabled on the created store.
*/
PersistentKeyValueFactory<K, V> enableCaching();

我可以自信地回答其中 2/3 的问题:

How that mentioned changelog would be represented?

变更日志是一个名为 $applicationId-$storename-changelog 的主题。这是一个普通的键值主题,其中键是 table 键,值是 table 值。本主题由 Kafka Streams 创建和管理。如果你这样做 disableLogging,据我所知,如果商店以某种方式丢失而不重放你的整个拓扑(如果它是可重放的!)将无法恢复

What kind of caching is implied here?

访问底层 RocksDB 实例之前的 LRU 内存缓存。例如,参见 CachedStateStore and CachedKeyValueStore specifically, CachedKeyValueStore#getInternal()

关于:

How these 4 values affect each other? Each window has max number of elements - windowSize? Once it is reached new window started? And each window could be divided up to numSegments files at disk for RocksDB? Duplicate means same both key and value and it is detected only within the same window?

我最近没有看过这些内部结构,所以无法准确记住。不过我可以说以下内容:

  • 每个 window 没有最大数量的元素,除非您使用内存中的 LRU 存储。 Windows 以时间为基础存在,因此您的条目根据时间属于 window 或多个 windows,而不是 window 容量(通常没有固定容量)。 更新:需要注意的重要一点是,如果您使用的是缓存存储,它只会以偏移提交间隔指定的时间间隔定期刷新到磁盘。如果此类缓存存储支持 KTable,则 KTable 仅在 .
  • 时将消息转发给其子项
  • 是的,我相信每个 window 在磁盘上都被分成多个段。我最近没有看过代码,无法准确记住,我可能是错的。参见 RocksDBSegmentedBytesStore and its dependency Segments
  • 不确定在此上下文中是否存在重复项。