如何以批处理模式显示 TableEnvironment 的 table 内容
How to show the table content for the TableEnvironment in batch mode
我正在使用 Flink 1.12.0,并使用以下代码处理批处理数据。
我要显示table的内容。当我调用 print
时,它会抱怨 table 无法转换为数据集,但我不想使用 BatchTableEnviroment,这是一种旧的计划器 API。
test("batch test") {
val settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build()
val tenv = TableEnvironment.create(settings)
val ddl =
"""
create table sourceTable(
key STRING,
`date` STRING,
price DOUBLE
) with (
'connector' = 'filesystem',
'path' = 'D:/projects/openprojects3/learn.flink.ioc/data/stock.csv',
'format' = 'csv'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
val table = tenv.sqlQuery(
"""
select * from sourceTable
""".stripMargin(' '))
table.print()
}
据我所知,处理时间临时 table 连接要求查找 table 由 LookupTableSource
支持。到目前为止,在 Flink 代码库本身中只有 Hive、HBase 和 JDBC 连接器实现了这个接口。如果您想快速尝试一下,您也可以使用 [1],它也实现了上述接口。
我正在使用 Flink 1.12.0,并使用以下代码处理批处理数据。
我要显示table的内容。当我调用 print
时,它会抱怨 table 无法转换为数据集,但我不想使用 BatchTableEnviroment,这是一种旧的计划器 API。
test("batch test") {
val settings = EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build()
val tenv = TableEnvironment.create(settings)
val ddl =
"""
create table sourceTable(
key STRING,
`date` STRING,
price DOUBLE
) with (
'connector' = 'filesystem',
'path' = 'D:/projects/openprojects3/learn.flink.ioc/data/stock.csv',
'format' = 'csv'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
val table = tenv.sqlQuery(
"""
select * from sourceTable
""".stripMargin(' '))
table.print()
}
据我所知,处理时间临时 table 连接要求查找 table 由 LookupTableSource
支持。到目前为止,在 Flink 代码库本身中只有 Hive、HBase 和 JDBC 连接器实现了这个接口。如果您想快速尝试一下,您也可以使用 [1],它也实现了上述接口。