Flink SQL: source table 太大,无法放入内存

Flink SQL: source table is too big to fit into memory

我对 Flink 比较陌生,今天在 Flink 1.11.3 会话集群上使用 Flink SQL 时遇到了问题。

问题

我注册了一个使用 jdbc postgres 驱动程序的源 table。我正在尝试以 parquet 格式将一些数据从这个在线数据库移动到 AWS S3。这个 table 很大(~43 GB)。大约 1 分钟后作业失败,任务管理器在没有任何警告的情况下崩溃。但我最好的猜测是任务管理器 运行 内存不足。

我的观察

我发现当我这样做时 tableEnv.executeSql("select ... from huge_table limit 1000") flink 试图将整个源 table 扫描到内存中,然后才计划执行限制。

问题

因为我只关心最近几天的数据,有没有办法限制一个作业按时间戳扫描多少行?

附录

这是可以重现问题的最小设置(去除了很多噪音)

环境设置代码

var blinkSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
var tableEnv = TableEnvironment.create(blinkSettings);

来源table Flink中的DDLSQL

CREATE TABLE source_transactions (
    txid STRING,
    username STRING,
    amount BIGINT,
    ts TIMESTAMP,
    PRIMARY KEY (txid) NOT ENFORCED
) WITH (
    'connector'='jdbc',
    'url'='jdbc:postgresql://my.bank',
    'table-name'='transactions',
    'driver'='org.postgresql.Driver',
    'username'='username',
    'password'='password',
    'scan.fetch-size'='2000'
)

Sink table Flink 中的 DDL SQL

CREATE TABLE sink_transactions (
    create_time TIMESTAMP,
    username STRING,
    delta_amount DOUBLE,
    dt STRING
) PARTITIONED BY (dt) WITH (
    'connector'='filesystem',
    'path'='s3a://s3/path/to/transactions',
    'format'='parquet'
)

在 Flink 中插入查询 SQL

INSERT INTO sink_transactions
SELECT ts, username, CAST(t.amount AS DOUBLE) / 100, DATE_FORMAT(ts, 'yyyy-MM-dd')
FROM source_transactions

你的观察是对的,Flink 不支持 JDBC connector 的 limit pushdown 优化,并且有一个几乎合并的 PR 支持这个特性,这个将在 Flink 1.13 中使用,你可以 cherry-pick如果您急需此功能,请将此补丁添加到您的代码中。

1.JIRA: FLINK-19650 Support the limit push down for the Jdbc

2.PR: https://github.com/apache/flink/pull/13800