如何以批处理模式显示 TableEnvironment 的 table 内容

How to show the table content for the TableEnvironment in batch mode

我正在使用 Flink 1.12.0,并使用以下代码处理批处理数据。

我要显示table的内容。当我调用 print 时,它会抱怨 table 无法转换为数据集,但我不想使用 BatchTableEnviroment,这是一种旧的计划器 API。

test("batch test") {
    val settings  = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build()
    val tenv = TableEnvironment.create(settings)
    val ddl =
      """
      create table sourceTable(
      key STRING,
      `date` STRING,
      price DOUBLE
      ) with (
        'connector' = 'filesystem',
        'path' = 'D:/projects/openprojects3/learn.flink.ioc/data/stock.csv',
        'format' = 'csv'
      )
      """.stripMargin(' ')
    tenv.executeSql(ddl)

    val table = tenv.sqlQuery(
      """
      select * from sourceTable
      """.stripMargin(' '))

    table.print()


  }

据我所知,处理时间临时 table 连接要求查找 table 由 LookupTableSource 支持。到目前为止,在 Flink 代码库本身中只有 Hive、HBase 和 JDBC 连接器实现了这个接口。如果您想快速尝试一下,您也可以使用 [1],它也实现了上述接口。

[1] https://github.com/knaufk/flink-faker