文件系统连接器不产生输出 & streamEnvironment.executes() 抛出 "no operators defined"

Filesystem connector produces no output & streamEnvironment.executes() throws "no operators defined"

我有以下代码想将数据生成的数据写入文件,但是当我运行应用程序时,没有创建目标目录,也没有写入数据。

当我在代码末尾添加 env.execute() 时,它会提示 No operators defined in streaming topology. Cannot execute.

我想问一下如何使应用程序工作,谢谢。

test("insert into table") {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tenv = StreamTableEnvironment.create(env)
    val ddl =
      """
      create temporary table abc(
       name STRING,
       age INT
      ) with (
        'connector' = 'datagen'
      )
      """.stripMargin(' ')

    tenv.executeSql(ddl)

    val sql =
      """
        select * from abc
      """.stripMargin(' ')

    val sinkDDL =
      s"""
      create temporary table xyz(
       name STRING,
       age INT
      ) with (
       'connector' = 'filesystem',
       'path' = 'D:\${System.currentTimeMillis()}-csv' ,
       'format' = 'csv'

      )
      """.stripMargin(' ')

    tenv.executeSql(sinkDDL)

    val insertInSQL =
      """
      insert into xyz
      select name, age from abc
      """.stripMargin(' ')

    tenv.executeSql(insertInSQL)


//    env.execute()


  }

我认为你应该在 table 执行中有 UDF,请参阅

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/udfs.html#table-functions

您可以看到示例,编写函数并将其插入到您的 sql 管道中,这在您的错误消息中充当“运算符”。

我认为它确实有效,只是没有 我们认为它有效:)

我已经用 Blink planner in Flink 1.12:

测试过了
"org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % "provided"

最后在 StreamingEnvironment 上调用 env.execute() 实际上 不需要 ,因为程序前面的每个 .executeSql() 已经提交异步作业。然后,您代码中的接收器会附加到其中一个作业,而不是 env.execute() 将启动的作业(在本例中是空作业,会触发您提到的错误)。我在 this response on the mailing list.

中找到了相关线索

当我 运行 问题中的代码时(使用 Blink 规划器,并在我的主机上将输出调整为 'path' = '/tmp/hello-flink-${System.currentTimeMillis()}-csv'),我看到几个 hidden 正在逐步创建文件。我猜它们同样被隐藏在您的 Windows 主机上(下面以 . 开头的文件意味着隐藏在 Linux 上):

ls -ltra /tmp/hello-flink-1609574239647-csv
total 165876
drwxrwxrwt 40 root  root      12288 Jan  2 08:57 ..
-rw-rw-r--  1 svend svend 134217771 Jan  2 08:59 .part-393f5557-894a-4396-bdf9-c7813fdd1d75-0-0.inprogress.48863a2b-f022-401b-95e3-659ec4920162
drwxrwxr-x  2 svend svend      4096 Jan  2 08:59 .
-rw-rw-r--  1 svend svend  35616014 Jan  2 08:59 .part-393f5557-894a-4396-bdf9-c7813fdd1d75-0-1.inprogress.3412bcb0-d30d-43be-819b-1acf26a0a8bb

发生的事情只是 FileSystem SQL connector 的滚动策略默认情况下在提交文件之前等待更长时间。

如果您从 IDE 开始编写代码,您可以按如下方式调整环境的创建(通常在 conf/flink-conf.yaml 中完成):

  val props = new Properties
  props.setProperty("execution.checkpointing.interval", "10000")  // 10000 ms
  val conf = ConfigurationUtils.createConfiguration(props)
  val fsEnv = StreamExecutionEnvironment.createLocalEnvironment(1, conf)

并在输出连接器中使用较小的文件大小:

      create temporary table xyz(
       name STRING,
       age INT
      ) with (
       'connector' = 'filesystem',
       'path' = '/tmp/hello-flink-${System.currentTimeMillis()}-csv' ,
       'format' = 'csv',
       'sink.rolling-policy.file-size' = '1Mb'

CSV 文件现在应该可以更快地提交:

ls -ltra hello-flink-1609575075617-csv
total 17896
-rw-rw-r--  1 svend svend 1048669 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-0
-rw-rw-r--  1 svend svend 1048644 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-1
-rw-rw-r--  1 svend svend 1048639 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-2
-rw-rw-r--  1 svend svend 1048676 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-3
-rw-rw-r--  1 svend svend 1048680 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-4
-rw-rw-r--  1 svend svend 1048642 Jan  2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-5