如何在没有更新和删除更改错误的情况下写入 flink 中的 s3 table 接收器?
How write to s3 table sink in flink without update and delete changes error?
考虑一段代码:
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
class Scratch {
public static void main(String[] args) {
StreamTableEnvironment tableEnv = /*some init code here*/;
tableEnv.executeSql("CREATE TABLE my_table (\n" +
" id STRING,\n" +
" createdDate DATE,\n" +
" `date` STRING " +
" ) PARTITIONED BY (`date`) \n" +
" WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 's3://my-bucket/',\n" +
" 'format' = 'json'\n" +
" )");
tableEnv.executeSql("CREATE TABLE output_table (\n" +
" id STRING,\n" +
" created_date DATE,\n" +
" count_value BIGINT,\n" +
" PRIMARY KEY (id, created_date) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'filesystem', \n" +
" 'path' = 's3://some-bucket/output-table/',\n" +
" 'format' = 'json'\n" +
" )");
Table temp = tableEnv.sqlQuery(
" SELECT id as id, " +
" max(createdDate) as created_date, " +
" COUNT(DISTINCT(id)) as count_value " +
" from my_table\n" +
" GROUP BY createdDate, id"
);
temp.executeInsert("output_table");
}
}
这会给我错误:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Table sink 'default_catalog.default_database.output_table' doesn't support consuming update changes which is produced by node GroupAggregate(select=[MIN($f0) AS id, MAX(createdDate) AS created_date, COUNT(DISTINCT $f2) AS count_value ])
有没有办法通过flink将聚合写入s3? (一个flink在batch模式下是运行)
因为您是 运行在流模式下查询,这需要一个接收器来处理来自聚合的更新和删除。
如果您有任何一个,这将起作用
- 以 CDC(变更日志)格式生成结果,例如 debezium,
- 或运行批处理模式的作业
要运行在批处理模式下,你可以这样做:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
如果您需要在批处理模式下使用 Table API 同时还可以访问 DataStream API,这仅在 Flink 1.14 之后才有可能。
考虑一段代码:
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
class Scratch {
public static void main(String[] args) {
StreamTableEnvironment tableEnv = /*some init code here*/;
tableEnv.executeSql("CREATE TABLE my_table (\n" +
" id STRING,\n" +
" createdDate DATE,\n" +
" `date` STRING " +
" ) PARTITIONED BY (`date`) \n" +
" WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 's3://my-bucket/',\n" +
" 'format' = 'json'\n" +
" )");
tableEnv.executeSql("CREATE TABLE output_table (\n" +
" id STRING,\n" +
" created_date DATE,\n" +
" count_value BIGINT,\n" +
" PRIMARY KEY (id, created_date) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'filesystem', \n" +
" 'path' = 's3://some-bucket/output-table/',\n" +
" 'format' = 'json'\n" +
" )");
Table temp = tableEnv.sqlQuery(
" SELECT id as id, " +
" max(createdDate) as created_date, " +
" COUNT(DISTINCT(id)) as count_value " +
" from my_table\n" +
" GROUP BY createdDate, id"
);
temp.executeInsert("output_table");
}
}
这会给我错误:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Table sink 'default_catalog.default_database.output_table' doesn't support consuming update changes which is produced by node GroupAggregate(select=[MIN($f0) AS id, MAX(createdDate) AS created_date, COUNT(DISTINCT $f2) AS count_value ])
有没有办法通过flink将聚合写入s3? (一个flink在batch模式下是运行)
因为您是 运行在流模式下查询,这需要一个接收器来处理来自聚合的更新和删除。
如果您有任何一个,这将起作用
- 以 CDC(变更日志)格式生成结果,例如 debezium,
- 或运行批处理模式的作业
要运行在批处理模式下,你可以这样做:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
如果您需要在批处理模式下使用 Table API 同时还可以访问 DataStream API,这仅在 Flink 1.14 之后才有可能。