Flink 通过 table DSL 创建 table

Flink create table via table DSL

为了创建 table,我使用 SQL 语法,如

    val tableEnv = StreamTableEnvironment.create(env, settings)
    tableEnv.executeSql(
      "CREATE TABLE asset (smth STRING) " +
        "WITH ('connector' = 'jdbc', " +
        "'url' = 'jdbc:mysql://host:3306/db', " +
        "'username' = 'user', " +
        "'password' = 'pass', " +
        "'table-name' = 'table')"
    ) 

是否有通过 Table API DSL 定义 table 的选项?

您可以做的是从代码中删除这些 table 定义并将它们存储在持久目录中。但是对于在 in-memory 目录中创建临时 tables,你正在做的就是它的完成方式。

您可以使用以下 Table API 方法创建完全相同的 table:

    Schema schema =
            Schema.newBuilder()
                    .column("smth", DataTypes.STRING())
                    .build();
    TableDescriptor tableDescriptor =
            TableDescriptor.forConnector("jdbc")
                    .option(JdbcConnectorOptions.URL, "jdbc:mysql://host:3306/db")
                    .option(JdbcConnectorOptions.USERNAME, "user")
                    .option(JdbcConnectorOptions.PASSWORD, "pass")
                    .option(JdbcConnectorOptions.TABLE_NAME, "table")
                    .schema(schema)
                    .build();
    tEnv.createTable("asset", tableDescriptor);

要创建临时 table,请使用 tEnv.createTemporaryTable

查看 TableDescriptor and Schema 了解更多详情。