Flink SQL: source table 太大,无法放入内存
Flink SQL: source table is too big to fit into memory
我对 Flink 比较陌生,今天在 Flink 1.11.3 会话集群上使用 Flink SQL 时遇到了问题。
问题
我注册了一个使用 jdbc postgres 驱动程序的源 table。我正在尝试以 parquet 格式将一些数据从这个在线数据库移动到 AWS S3。这个 table 很大(~43 GB)。大约 1 分钟后作业失败,任务管理器在没有任何警告的情况下崩溃。但我最好的猜测是任务管理器 运行 内存不足。
我的观察
我发现当我这样做时 tableEnv.executeSql("select ... from huge_table limit 1000")
flink 试图将整个源 table 扫描到内存中,然后才计划执行限制。
问题
因为我只关心最近几天的数据,有没有办法限制一个作业按时间戳扫描多少行?
附录
这是可以重现问题的最小设置(去除了很多噪音)
环境设置代码
var blinkSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
var tableEnv = TableEnvironment.create(blinkSettings);
来源table Flink中的DDLSQL
CREATE TABLE source_transactions (
txid STRING,
username STRING,
amount BIGINT,
ts TIMESTAMP,
PRIMARY KEY (txid) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:postgresql://my.bank',
'table-name'='transactions',
'driver'='org.postgresql.Driver',
'username'='username',
'password'='password',
'scan.fetch-size'='2000'
)
Sink table Flink 中的 DDL SQL
CREATE TABLE sink_transactions (
create_time TIMESTAMP,
username STRING,
delta_amount DOUBLE,
dt STRING
) PARTITIONED BY (dt) WITH (
'connector'='filesystem',
'path'='s3a://s3/path/to/transactions',
'format'='parquet'
)
在 Flink 中插入查询 SQL
INSERT INTO sink_transactions
SELECT ts, username, CAST(t.amount AS DOUBLE) / 100, DATE_FORMAT(ts, 'yyyy-MM-dd')
FROM source_transactions
你的观察是对的,Flink 不支持 JDBC connector 的 limit pushdown 优化,并且有一个几乎合并的 PR 支持这个特性,这个将在 Flink 1.13 中使用,你可以 cherry-pick如果您急需此功能,请将此补丁添加到您的代码中。
1.JIRA: FLINK-19650 Support the limit push down for the Jdbc
我对 Flink 比较陌生,今天在 Flink 1.11.3 会话集群上使用 Flink SQL 时遇到了问题。
问题
我注册了一个使用 jdbc postgres 驱动程序的源 table。我正在尝试以 parquet 格式将一些数据从这个在线数据库移动到 AWS S3。这个 table 很大(~43 GB)。大约 1 分钟后作业失败,任务管理器在没有任何警告的情况下崩溃。但我最好的猜测是任务管理器 运行 内存不足。
我的观察
我发现当我这样做时 tableEnv.executeSql("select ... from huge_table limit 1000")
flink 试图将整个源 table 扫描到内存中,然后才计划执行限制。
问题
因为我只关心最近几天的数据,有没有办法限制一个作业按时间戳扫描多少行?
附录
这是可以重现问题的最小设置(去除了很多噪音)
环境设置代码
var blinkSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
var tableEnv = TableEnvironment.create(blinkSettings);
来源table Flink中的DDLSQL
CREATE TABLE source_transactions (
txid STRING,
username STRING,
amount BIGINT,
ts TIMESTAMP,
PRIMARY KEY (txid) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:postgresql://my.bank',
'table-name'='transactions',
'driver'='org.postgresql.Driver',
'username'='username',
'password'='password',
'scan.fetch-size'='2000'
)
Sink table Flink 中的 DDL SQL
CREATE TABLE sink_transactions (
create_time TIMESTAMP,
username STRING,
delta_amount DOUBLE,
dt STRING
) PARTITIONED BY (dt) WITH (
'connector'='filesystem',
'path'='s3a://s3/path/to/transactions',
'format'='parquet'
)
在 Flink 中插入查询 SQL
INSERT INTO sink_transactions
SELECT ts, username, CAST(t.amount AS DOUBLE) / 100, DATE_FORMAT(ts, 'yyyy-MM-dd')
FROM source_transactions
你的观察是对的,Flink 不支持 JDBC connector 的 limit pushdown 优化,并且有一个几乎合并的 PR 支持这个特性,这个将在 Flink 1.13 中使用,你可以 cherry-pick如果您急需此功能,请将此补丁添加到您的代码中。
1.JIRA: FLINK-19650 Support the limit push down for the Jdbc