为什么我的 Flink SQL 查询具有非常不同的检查点大小?

Why do my Flink SQL queries have very different checkpoint sizes?

在我的项目中使用Flink Table SQL时,我发现如果我的SQL中有任何GROUP BY子句,检查点的大小会增加大大.

例如,

INSERT INTO COMPANY_POST_DAY
SELECT
    sta_date,
    company_id,
    company_name
FROM
    FCBOX_POST_COUNT_VIEW

检查点大小将小于 500KB。

但是这样使用时,

INSERT INTO COMPANY_POST_DAY
SELECT
    sta_date,
    company_id,
    company_name,
    sum(ed_post_count)
FROM
    FCBOX_POST_COUNT_VIEW
GROUP BY
    sta_date, company_id, company_name, TUMBLE(procTime, INTERVAL '1' SECOND)

即使没有处理任何消息,检查点大小也会超​​过 70MB。像这样,

但是当使用DataStream API 和keyBy 而不是Table SQL GROUP BY 时,检查点大小是正常的,小于1MB。

为什么?

--------更新于 2019-03-25--------

经过一些测试和阅读源代码,我们发现这是 RocksDB 的原因。

当使用 RockDB 作为状态后端时,检查点的大小将超过每个键 5MB,而当使用文件系统作为状态后端时,检查点的大小将下降到每个键不到 100KB .

为什么 RocksDB 需要这么多 space 来保存状态?什么时候选择RocksDB?

首先,我不认为 70 MB 是巨大的状态。有许多具有多个 TB 状态的 Flink 作业。关于为什么两个查询的状态大小不同的问题:

第一个查询是一个简单的投影查询,也就是说每条记录都可以独立处理。因此,查询不需要 "remember" 任何记录,只需要用于恢复的流偏移量。

第二个查询执行 window 聚合,需要记住每个 window 的中间结果(部分和),直到时间足够长以至于结果是最终的并且可以发出。

由于 Flink SQL 查询被转换为 DataStream 运算符,因此 SQL 查询与使用 keyBy().window() 实现聚合之间没有太大区别。两者 运行 几乎相同的代码。

Update: 状态增加的原因已确定是RocksDBStateBackend引起的。此开销不是每个键的开销,而是每个有状态运算符的开销。由于 RocksDBStateBackend 旨在保存多个 GB 到 TB 的状态大小,因此几 MB 的开销可以忽略不计。