为什么我的 Flink SQL 查询具有非常不同的检查点大小?
Why do my Flink SQL queries have very different checkpoint sizes?
在我的项目中使用Flink Table SQL时,我发现如果我的SQL中有任何GROUP BY
子句,检查点的大小会增加大大.
例如,
INSERT INTO COMPANY_POST_DAY
SELECT
sta_date,
company_id,
company_name
FROM
FCBOX_POST_COUNT_VIEW
检查点大小将小于 500KB。
但是这样使用时,
INSERT INTO COMPANY_POST_DAY
SELECT
sta_date,
company_id,
company_name,
sum(ed_post_count)
FROM
FCBOX_POST_COUNT_VIEW
GROUP BY
sta_date, company_id, company_name, TUMBLE(procTime, INTERVAL '1' SECOND)
即使没有处理任何消息,检查点大小也会超过 70MB。像这样,
但是当使用DataStream API 和keyBy
而不是Table SQL GROUP BY
时,检查点大小是正常的,小于1MB。
为什么?
--------更新于 2019-03-25--------
经过一些测试和阅读源代码,我们发现这是 RocksDB 的原因。
当使用 RockDB 作为状态后端时,检查点的大小将超过每个键 5MB,而当使用文件系统作为状态后端时,检查点的大小将下降到每个键不到 100KB .
为什么 RocksDB 需要这么多 space 来保存状态?什么时候选择RocksDB?
首先,我不认为 70 MB 是巨大的状态。有许多具有多个 TB 状态的 Flink 作业。关于为什么两个查询的状态大小不同的问题:
第一个查询是一个简单的投影查询,也就是说每条记录都可以独立处理。因此,查询不需要 "remember" 任何记录,只需要用于恢复的流偏移量。
第二个查询执行 window 聚合,需要记住每个 window 的中间结果(部分和),直到时间足够长以至于结果是最终的并且可以发出。
由于 Flink SQL 查询被转换为 DataStream 运算符,因此 SQL 查询与使用 keyBy().window()
实现聚合之间没有太大区别。两者 运行 几乎相同的代码。
Update: 状态增加的原因已确定是RocksDBStateBackend引起的。此开销不是每个键的开销,而是每个有状态运算符的开销。由于 RocksDBStateBackend 旨在保存多个 GB 到 TB 的状态大小,因此几 MB 的开销可以忽略不计。
在我的项目中使用Flink Table SQL时,我发现如果我的SQL中有任何GROUP BY
子句,检查点的大小会增加大大.
例如,
INSERT INTO COMPANY_POST_DAY
SELECT
sta_date,
company_id,
company_name
FROM
FCBOX_POST_COUNT_VIEW
检查点大小将小于 500KB。
但是这样使用时,
INSERT INTO COMPANY_POST_DAY
SELECT
sta_date,
company_id,
company_name,
sum(ed_post_count)
FROM
FCBOX_POST_COUNT_VIEW
GROUP BY
sta_date, company_id, company_name, TUMBLE(procTime, INTERVAL '1' SECOND)
即使没有处理任何消息,检查点大小也会超过 70MB。像这样,
但是当使用DataStream API 和keyBy
而不是Table SQL GROUP BY
时,检查点大小是正常的,小于1MB。
为什么?
--------更新于 2019-03-25--------
经过一些测试和阅读源代码,我们发现这是 RocksDB 的原因。
当使用 RockDB 作为状态后端时,检查点的大小将超过每个键 5MB,而当使用文件系统作为状态后端时,检查点的大小将下降到每个键不到 100KB .
为什么 RocksDB 需要这么多 space 来保存状态?什么时候选择RocksDB?
首先,我不认为 70 MB 是巨大的状态。有许多具有多个 TB 状态的 Flink 作业。关于为什么两个查询的状态大小不同的问题:
第一个查询是一个简单的投影查询,也就是说每条记录都可以独立处理。因此,查询不需要 "remember" 任何记录,只需要用于恢复的流偏移量。
第二个查询执行 window 聚合,需要记住每个 window 的中间结果(部分和),直到时间足够长以至于结果是最终的并且可以发出。
由于 Flink SQL 查询被转换为 DataStream 运算符,因此 SQL 查询与使用 keyBy().window()
实现聚合之间没有太大区别。两者 运行 几乎相同的代码。
Update: 状态增加的原因已确定是RocksDBStateBackend引起的。此开销不是每个键的开销,而是每个有状态运算符的开销。由于 RocksDBStateBackend 旨在保存多个 GB 到 TB 的状态大小,因此几 MB 的开销可以忽略不计。