Flink 通过 table DSL 创建 table
Flink create table via table DSL
为了创建 table,我使用 SQL 语法,如
val tableEnv = StreamTableEnvironment.create(env, settings)
tableEnv.executeSql(
"CREATE TABLE asset (smth STRING) " +
"WITH ('connector' = 'jdbc', " +
"'url' = 'jdbc:mysql://host:3306/db', " +
"'username' = 'user', " +
"'password' = 'pass', " +
"'table-name' = 'table')"
)
是否有通过 Table API DSL 定义 table 的选项?
您可以做的是从代码中删除这些 table 定义并将它们存储在持久目录中。但是对于在 in-memory 目录中创建临时 tables,你正在做的就是它的完成方式。
您可以使用以下 Table API 方法创建完全相同的 table:
Schema schema =
Schema.newBuilder()
.column("smth", DataTypes.STRING())
.build();
TableDescriptor tableDescriptor =
TableDescriptor.forConnector("jdbc")
.option(JdbcConnectorOptions.URL, "jdbc:mysql://host:3306/db")
.option(JdbcConnectorOptions.USERNAME, "user")
.option(JdbcConnectorOptions.PASSWORD, "pass")
.option(JdbcConnectorOptions.TABLE_NAME, "table")
.schema(schema)
.build();
tEnv.createTable("asset", tableDescriptor);
要创建临时 table,请使用 tEnv.createTemporaryTable
。
查看 TableDescriptor
and Schema
了解更多详情。
为了创建 table,我使用 SQL 语法,如
val tableEnv = StreamTableEnvironment.create(env, settings)
tableEnv.executeSql(
"CREATE TABLE asset (smth STRING) " +
"WITH ('connector' = 'jdbc', " +
"'url' = 'jdbc:mysql://host:3306/db', " +
"'username' = 'user', " +
"'password' = 'pass', " +
"'table-name' = 'table')"
)
是否有通过 Table API DSL 定义 table 的选项?
您可以做的是从代码中删除这些 table 定义并将它们存储在持久目录中。但是对于在 in-memory 目录中创建临时 tables,你正在做的就是它的完成方式。
您可以使用以下 Table API 方法创建完全相同的 table:
Schema schema =
Schema.newBuilder()
.column("smth", DataTypes.STRING())
.build();
TableDescriptor tableDescriptor =
TableDescriptor.forConnector("jdbc")
.option(JdbcConnectorOptions.URL, "jdbc:mysql://host:3306/db")
.option(JdbcConnectorOptions.USERNAME, "user")
.option(JdbcConnectorOptions.PASSWORD, "pass")
.option(JdbcConnectorOptions.TABLE_NAME, "table")
.schema(schema)
.build();
tEnv.createTable("asset", tableDescriptor);
要创建临时 table,请使用 tEnv.createTemporaryTable
。
查看 TableDescriptor
and Schema
了解更多详情。