超级混淆 table 和数据集或数据流转换

super confused with table and dataset or datastream conversion

我正在使用 Flink 1.12,我对何时可以执行 table 和 dataset/datastream 转换感到非常困惑。 在下面的代码中,我想把table的内容打印到控制台,我尝试了以下3种方式 ,他们都抛出异常

  1. table.toDataSet[行].print()
  2. table.toAppendStream[行].print()
  3. table.print()

我会问如何将table内容打印到控制台,例如,使用print方法

import org.apache.flink.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, TableEnvironment, TableResult}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.types.Row

object Sql021_PlannerOldBatchTest {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()

    val env = TableEnvironment.create(settings)

    val fmt = new Csv().fieldDelimiter(',').deriveSchema()
    val schema = new Schema()
      .field("a", DataTypes.STRING())
      .field("b", DataTypes.STRING())
      .field("c", DataTypes.DOUBLE())

    env.connect(new FileSystem().path("D:/stock.csv")).withSchema(schema).withFormat(fmt).createTemporaryTable("sourceTable")

    val table = env.sqlQuery("select * from sourceTable")



    //ERROR: Only tables that originate from Scala DataSets can be converted to Scala DataSets.
    // table.toDataSet[Row].print()

    //ERROR:Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.
    table.toAppendStream[Row].print()


    //ERROR: table doesn't has the print method
    //    table.print()

  }

}

在流媒体的情况下,这会起作用

tenv.toAppendStream(table, classOf[Row]).print()
env.execute()

以及你可以做的批处理案例

val tableResult: TableResult = env.executeSql("select * from sourceTable")
tableResult.print()