KTable和本地store的区别
Difference between KTable and local store
这些实体有什么区别?
如我所想,KTable - 具有 compaction
删除策略的简单 kafka 主题。此外,如果为 KTable 启用了日志记录,那么也会有更改日志,然后删除策略为 compaction,delete
.
本地存储 - 基于 RockDB 的内存中键值缓存。但是本地商店也有更新日志。
在这两种情况下,我们都会获得特定时间段内键的最后一个值(?)。本地存储用于聚合步骤、连接等。但是在它之后还创建了具有压缩策略的新主题。
例如:
KStream<K, V> source = builder.stream(topic1);
KTable<K, V> table = builder.table(topic2); // what will happen here if i read data from topic with deletion policy delete and compaction? Will additional topic be created for store data or just a local store (cache) be used for it?
// or
KTable<K, V> table2 = builder.table(..., Materialized.as("key-value-store-name")) // what will happen here? As i think, i just specified a concrete name for local store and now i can query it as a regular key-value store
source.groupByKey().aggregate(initialValue, aggregationLogic, Materialized.as(...)) // Will new aggregation topic be created here with compaction deletion policy? Or only local store will be used?
我还可以使用构建器 builder.addStateStore(...)
创建一个状态存储,我可以在其中 enable/disable 日志记录(变更日志)和缓存(???)。
我读过这篇文章:https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html,但有些细节我还是不清楚。特别是当我们可以禁用 StreamCache(但不是 RockDB 缓存)并且我们将获得关系数据库的 CDC 系统的完整副本时
A KTable
是 合乎逻辑的 抽象 table 随着时间的推移而更新。此外,您可以不将其视为具体化的 table,而是将其视为由 table 的所有更新记录组成的变更日志流。比较 https://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables。因此,如果您愿意,从概念上讲,KTable
是一种混合体,但是,更容易将其视为随时间更新的 table。
在内部,KTable
是使用 RocksDB 和 Kafka 中的主题实现的。 RocksDB 存储的是 table 的当前数据(注意,RocksDB 不是内存存储,可以写入磁盘)。同时,对 KTable
(即对 RocksDB)的每个更新都写入相应的 Kafka 主题。 Kafka 主题用于容错原因(请注意,RocksDB 本身被认为是短暂的,通过 RocksDB 写入磁盘不提供容错,但使用的 changelog 主题),并配置为启用日志压缩以确保从题目中可以恢复RocksDB的最新状态
如果你有一个由窗口聚合创建的 KTable
,Kafka 主题配置 compact,delete
到过期的旧数据(即旧的 windows)以避免这种情况table(即 RocksDB)无限增长。
除了 RocksDB,您还可以使用不写入磁盘的内存存储 KTable
。出于容错原因,此商店还将有一个更改日志主题,用于跟踪商店的所有更新。
如果您通过 builder.addStateStore()
手动添加存储,您还可以添加 RocksDB 或内存存储。在这种情况下,您可以启用类似于 KTable
的容错更改日志记录(注意,当创建 KTable 时,在内部,它使用完全相同的 API —— 即 KTable
是隐藏了一些内部细节的更高层次的抽象。
用于缓存:这是在 Kafka Streams 中和存储(RocksDB 或内存中)之上实现的,您可以 enable/disable 用于 "plain" 您手动添加的存储,对于K表。比较https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html因此,缓存是独立于RocksDB缓存的。
这些实体有什么区别?
如我所想,KTable - 具有 compaction
删除策略的简单 kafka 主题。此外,如果为 KTable 启用了日志记录,那么也会有更改日志,然后删除策略为 compaction,delete
.
本地存储 - 基于 RockDB 的内存中键值缓存。但是本地商店也有更新日志。
在这两种情况下,我们都会获得特定时间段内键的最后一个值(?)。本地存储用于聚合步骤、连接等。但是在它之后还创建了具有压缩策略的新主题。
例如:
KStream<K, V> source = builder.stream(topic1);
KTable<K, V> table = builder.table(topic2); // what will happen here if i read data from topic with deletion policy delete and compaction? Will additional topic be created for store data or just a local store (cache) be used for it?
// or
KTable<K, V> table2 = builder.table(..., Materialized.as("key-value-store-name")) // what will happen here? As i think, i just specified a concrete name for local store and now i can query it as a regular key-value store
source.groupByKey().aggregate(initialValue, aggregationLogic, Materialized.as(...)) // Will new aggregation topic be created here with compaction deletion policy? Or only local store will be used?
我还可以使用构建器 builder.addStateStore(...)
创建一个状态存储,我可以在其中 enable/disable 日志记录(变更日志)和缓存(???)。
我读过这篇文章:https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html,但有些细节我还是不清楚。特别是当我们可以禁用 StreamCache(但不是 RockDB 缓存)并且我们将获得关系数据库的 CDC 系统的完整副本时
A KTable
是 合乎逻辑的 抽象 table 随着时间的推移而更新。此外,您可以不将其视为具体化的 table,而是将其视为由 table 的所有更新记录组成的变更日志流。比较 https://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables。因此,如果您愿意,从概念上讲,KTable
是一种混合体,但是,更容易将其视为随时间更新的 table。
在内部,KTable
是使用 RocksDB 和 Kafka 中的主题实现的。 RocksDB 存储的是 table 的当前数据(注意,RocksDB 不是内存存储,可以写入磁盘)。同时,对 KTable
(即对 RocksDB)的每个更新都写入相应的 Kafka 主题。 Kafka 主题用于容错原因(请注意,RocksDB 本身被认为是短暂的,通过 RocksDB 写入磁盘不提供容错,但使用的 changelog 主题),并配置为启用日志压缩以确保从题目中可以恢复RocksDB的最新状态
如果你有一个由窗口聚合创建的 KTable
,Kafka 主题配置 compact,delete
到过期的旧数据(即旧的 windows)以避免这种情况table(即 RocksDB)无限增长。
除了 RocksDB,您还可以使用不写入磁盘的内存存储 KTable
。出于容错原因,此商店还将有一个更改日志主题,用于跟踪商店的所有更新。
如果您通过 builder.addStateStore()
手动添加存储,您还可以添加 RocksDB 或内存存储。在这种情况下,您可以启用类似于 KTable
的容错更改日志记录(注意,当创建 KTable 时,在内部,它使用完全相同的 API —— 即 KTable
是隐藏了一些内部细节的更高层次的抽象。
用于缓存:这是在 Kafka Streams 中和存储(RocksDB 或内存中)之上实现的,您可以 enable/disable 用于 "plain" 您手动添加的存储,对于K表。比较https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html因此,缓存是独立于RocksDB缓存的。