超级混淆 table 和数据集或数据流转换
super confused with table and dataset or datastream conversion
我正在使用 Flink 1.12,我对何时可以执行 table 和 dataset/datastream 转换感到非常困惑。
在下面的代码中,我想把table的内容打印到控制台,我尝试了以下3种方式
,他们都抛出异常
- table.toDataSet[行].print()
- table.toAppendStream[行].print()
- table.print()
我会问如何将table内容打印到控制台,例如,使用print
方法
import org.apache.flink.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, TableEnvironment, TableResult}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.types.Row
object Sql021_PlannerOldBatchTest {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val env = TableEnvironment.create(settings)
val fmt = new Csv().fieldDelimiter(',').deriveSchema()
val schema = new Schema()
.field("a", DataTypes.STRING())
.field("b", DataTypes.STRING())
.field("c", DataTypes.DOUBLE())
env.connect(new FileSystem().path("D:/stock.csv")).withSchema(schema).withFormat(fmt).createTemporaryTable("sourceTable")
val table = env.sqlQuery("select * from sourceTable")
//ERROR: Only tables that originate from Scala DataSets can be converted to Scala DataSets.
// table.toDataSet[Row].print()
//ERROR:Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.
table.toAppendStream[Row].print()
//ERROR: table doesn't has the print method
// table.print()
}
}
在流媒体的情况下,这会起作用
tenv.toAppendStream(table, classOf[Row]).print()
env.execute()
以及你可以做的批处理案例
val tableResult: TableResult = env.executeSql("select * from sourceTable")
tableResult.print()
我正在使用 Flink 1.12,我对何时可以执行 table 和 dataset/datastream 转换感到非常困惑。 在下面的代码中,我想把table的内容打印到控制台,我尝试了以下3种方式 ,他们都抛出异常
- table.toDataSet[行].print()
- table.toAppendStream[行].print()
- table.print()
我会问如何将table内容打印到控制台,例如,使用print
方法
import org.apache.flink.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, TableEnvironment, TableResult}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.types.Row
object Sql021_PlannerOldBatchTest {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val env = TableEnvironment.create(settings)
val fmt = new Csv().fieldDelimiter(',').deriveSchema()
val schema = new Schema()
.field("a", DataTypes.STRING())
.field("b", DataTypes.STRING())
.field("c", DataTypes.DOUBLE())
env.connect(new FileSystem().path("D:/stock.csv")).withSchema(schema).withFormat(fmt).createTemporaryTable("sourceTable")
val table = env.sqlQuery("select * from sourceTable")
//ERROR: Only tables that originate from Scala DataSets can be converted to Scala DataSets.
// table.toDataSet[Row].print()
//ERROR:Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.
table.toAppendStream[Row].print()
//ERROR: table doesn't has the print method
// table.print()
}
}
在流媒体的情况下,这会起作用
tenv.toAppendStream(table, classOf[Row]).print()
env.execute()
以及你可以做的批处理案例
val tableResult: TableResult = env.executeSql("select * from sourceTable")
tableResult.print()