文件系统连接器不产生输出 & streamEnvironment.executes() 抛出 "no operators defined"
Filesystem connector produces no output & streamEnvironment.executes() throws "no operators defined"
我有以下代码想将数据生成的数据写入文件,但是当我运行应用程序时,没有创建目标目录,也没有写入数据。
当我在代码末尾添加 env.execute()
时,它会提示 No operators defined in streaming topology. Cannot execute.
我想问一下如何使应用程序工作,谢谢。
test("insert into table") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl =
"""
create temporary table abc(
name STRING,
age INT
) with (
'connector' = 'datagen'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
val sql =
"""
select * from abc
""".stripMargin(' ')
val sinkDDL =
s"""
create temporary table xyz(
name STRING,
age INT
) with (
'connector' = 'filesystem',
'path' = 'D:\${System.currentTimeMillis()}-csv' ,
'format' = 'csv'
)
""".stripMargin(' ')
tenv.executeSql(sinkDDL)
val insertInSQL =
"""
insert into xyz
select name, age from abc
""".stripMargin(' ')
tenv.executeSql(insertInSQL)
// env.execute()
}
我认为你应该在 table 执行中有 UDF,请参阅
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/udfs.html#table-functions
您可以看到示例,编写函数并将其插入到您的 sql 管道中,这在您的错误消息中充当“运算符”。
我认为它确实有效,只是没有 当我们认为它有效:)
我已经用 Blink planner in Flink 1.12:
测试过了
"org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % "provided"
最后在 StreamingEnvironment
上调用 env.execute()
实际上 不需要 ,因为程序前面的每个 .executeSql()
已经提交异步作业。然后,您代码中的接收器会附加到其中一个作业,而不是 env.execute()
将启动的作业(在本例中是空作业,会触发您提到的错误)。我在 this response on the mailing list.
中找到了相关线索
当我 运行 问题中的代码时(使用 Blink 规划器,并在我的主机上将输出调整为 'path' = '/tmp/hello-flink-${System.currentTimeMillis()}-csv'
),我看到几个 hidden 正在逐步创建文件。我猜它们同样被隐藏在您的 Windows 主机上(下面以 .
开头的文件意味着隐藏在 Linux 上):
ls -ltra /tmp/hello-flink-1609574239647-csv
total 165876
drwxrwxrwt 40 root root 12288 Jan 2 08:57 ..
-rw-rw-r-- 1 svend svend 134217771 Jan 2 08:59 .part-393f5557-894a-4396-bdf9-c7813fdd1d75-0-0.inprogress.48863a2b-f022-401b-95e3-659ec4920162
drwxrwxr-x 2 svend svend 4096 Jan 2 08:59 .
-rw-rw-r-- 1 svend svend 35616014 Jan 2 08:59 .part-393f5557-894a-4396-bdf9-c7813fdd1d75-0-1.inprogress.3412bcb0-d30d-43be-819b-1acf26a0a8bb
发生的事情只是 FileSystem SQL connector 的滚动策略默认情况下在提交文件之前等待更长时间。
如果您从 IDE 开始编写代码,您可以按如下方式调整环境的创建(通常在 conf/flink-conf.yaml
中完成):
val props = new Properties
props.setProperty("execution.checkpointing.interval", "10000") // 10000 ms
val conf = ConfigurationUtils.createConfiguration(props)
val fsEnv = StreamExecutionEnvironment.createLocalEnvironment(1, conf)
并在输出连接器中使用较小的文件大小:
create temporary table xyz(
name STRING,
age INT
) with (
'connector' = 'filesystem',
'path' = '/tmp/hello-flink-${System.currentTimeMillis()}-csv' ,
'format' = 'csv',
'sink.rolling-policy.file-size' = '1Mb'
CSV 文件现在应该可以更快地提交:
ls -ltra hello-flink-1609575075617-csv
total 17896
-rw-rw-r-- 1 svend svend 1048669 Jan 2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-0
-rw-rw-r-- 1 svend svend 1048644 Jan 2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-1
-rw-rw-r-- 1 svend svend 1048639 Jan 2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-2
-rw-rw-r-- 1 svend svend 1048676 Jan 2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-3
-rw-rw-r-- 1 svend svend 1048680 Jan 2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-4
-rw-rw-r-- 1 svend svend 1048642 Jan 2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-5
我有以下代码想将数据生成的数据写入文件,但是当我运行应用程序时,没有创建目标目录,也没有写入数据。
当我在代码末尾添加 env.execute()
时,它会提示 No operators defined in streaming topology. Cannot execute.
我想问一下如何使应用程序工作,谢谢。
test("insert into table") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl =
"""
create temporary table abc(
name STRING,
age INT
) with (
'connector' = 'datagen'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
val sql =
"""
select * from abc
""".stripMargin(' ')
val sinkDDL =
s"""
create temporary table xyz(
name STRING,
age INT
) with (
'connector' = 'filesystem',
'path' = 'D:\${System.currentTimeMillis()}-csv' ,
'format' = 'csv'
)
""".stripMargin(' ')
tenv.executeSql(sinkDDL)
val insertInSQL =
"""
insert into xyz
select name, age from abc
""".stripMargin(' ')
tenv.executeSql(insertInSQL)
// env.execute()
}
我认为你应该在 table 执行中有 UDF,请参阅
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/udfs.html#table-functions
您可以看到示例,编写函数并将其插入到您的 sql 管道中,这在您的错误消息中充当“运算符”。
我认为它确实有效,只是没有 当我们认为它有效:)
我已经用 Blink planner in Flink 1.12:
测试过了"org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % "provided"
最后在 StreamingEnvironment
上调用 env.execute()
实际上 不需要 ,因为程序前面的每个 .executeSql()
已经提交异步作业。然后,您代码中的接收器会附加到其中一个作业,而不是 env.execute()
将启动的作业(在本例中是空作业,会触发您提到的错误)。我在 this response on the mailing list.
当我 运行 问题中的代码时(使用 Blink 规划器,并在我的主机上将输出调整为 'path' = '/tmp/hello-flink-${System.currentTimeMillis()}-csv'
),我看到几个 hidden 正在逐步创建文件。我猜它们同样被隐藏在您的 Windows 主机上(下面以 .
开头的文件意味着隐藏在 Linux 上):
ls -ltra /tmp/hello-flink-1609574239647-csv
total 165876
drwxrwxrwt 40 root root 12288 Jan 2 08:57 ..
-rw-rw-r-- 1 svend svend 134217771 Jan 2 08:59 .part-393f5557-894a-4396-bdf9-c7813fdd1d75-0-0.inprogress.48863a2b-f022-401b-95e3-659ec4920162
drwxrwxr-x 2 svend svend 4096 Jan 2 08:59 .
-rw-rw-r-- 1 svend svend 35616014 Jan 2 08:59 .part-393f5557-894a-4396-bdf9-c7813fdd1d75-0-1.inprogress.3412bcb0-d30d-43be-819b-1acf26a0a8bb
发生的事情只是 FileSystem SQL connector 的滚动策略默认情况下在提交文件之前等待更长时间。
如果您从 IDE 开始编写代码,您可以按如下方式调整环境的创建(通常在 conf/flink-conf.yaml
中完成):
val props = new Properties
props.setProperty("execution.checkpointing.interval", "10000") // 10000 ms
val conf = ConfigurationUtils.createConfiguration(props)
val fsEnv = StreamExecutionEnvironment.createLocalEnvironment(1, conf)
并在输出连接器中使用较小的文件大小:
create temporary table xyz(
name STRING,
age INT
) with (
'connector' = 'filesystem',
'path' = '/tmp/hello-flink-${System.currentTimeMillis()}-csv' ,
'format' = 'csv',
'sink.rolling-policy.file-size' = '1Mb'
CSV 文件现在应该可以更快地提交:
ls -ltra hello-flink-1609575075617-csv
total 17896
-rw-rw-r-- 1 svend svend 1048669 Jan 2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-0
-rw-rw-r-- 1 svend svend 1048644 Jan 2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-1
-rw-rw-r-- 1 svend svend 1048639 Jan 2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-2
-rw-rw-r-- 1 svend svend 1048676 Jan 2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-3
-rw-rw-r-- 1 svend svend 1048680 Jan 2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-4
-rw-rw-r-- 1 svend svend 1048642 Jan 2 09:11 part-a6158ce5-25ea-4361-be11-596a67989e4a-0-5