Apache Flink 批处理模式在几分钟后失败并打印结果
Apache Flink batch mode fails after a few minutes and prints the results
我正在使用 Apache Flink 读取 CSV 文件,然后将记录转换为 table,我从中执行 SQL 查询并将结果打印到标准输出。
代码(简体):
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
rowDataStreamSource = env.readFile(...).disableChaining();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
final Table table = tableEnv.fromChangelogStream(rowDataStreamSource, ordersSchema, ChangelogMode.insertOnly());
tableEnv.createTemporaryView("orders", table);
Table originalSQL = tableEnv.sqlQuery(...)
originalSQL.execute().print();
env.execute();
在 运行 完成这项工作后几分钟后出现错误:
21:29:49.999
[deploy.instance_IS_UNDEFINED,,,][mini-cluster-io-thread-3] INFO
o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service.
21:29:50.066 [deploy.instance_IS_UNDEFINED,,,][http-nio-8080-exec-1]
WARN o.a.f.s.a.o.c.CollectResultFetcher - Failed to get job status so
we assume that the job has terminated. Some data might be lost.
java.lang.IllegalStateException: MiniCluster is not yet running or has
already been shut down. at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at
org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:877)
然后 table 和我的 sql 结果打印在异常下方。
这里的问题是什么原因造成的?有没有办法告诉 Flink 没有更多记录以便它可以完成作业并打印结果?
您应该使用 FileSource
而不是 readFile
以便在批处理模式下正常工作:https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/connector/file/src/FileSource.html
或者,更好的是,您可以直接使用 SQL 定义一个 table 作为源来摄取输入文件,如下所述:https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/filesystem/
我正在使用 Apache Flink 读取 CSV 文件,然后将记录转换为 table,我从中执行 SQL 查询并将结果打印到标准输出。
代码(简体):
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
rowDataStreamSource = env.readFile(...).disableChaining();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
final Table table = tableEnv.fromChangelogStream(rowDataStreamSource, ordersSchema, ChangelogMode.insertOnly());
tableEnv.createTemporaryView("orders", table);
Table originalSQL = tableEnv.sqlQuery(...)
originalSQL.execute().print();
env.execute();
在 运行 完成这项工作后几分钟后出现错误:
21:29:49.999 [deploy.instance_IS_UNDEFINED,,,][mini-cluster-io-thread-3] INFO o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service. 21:29:50.066 [deploy.instance_IS_UNDEFINED,,,][http-nio-8080-exec-1] WARN o.a.f.s.a.o.c.CollectResultFetcher - Failed to get job status so we assume that the job has terminated. Some data might be lost. java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:877)
然后 table 和我的 sql 结果打印在异常下方。
这里的问题是什么原因造成的?有没有办法告诉 Flink 没有更多记录以便它可以完成作业并打印结果?
您应该使用 FileSource
而不是 readFile
以便在批处理模式下正常工作:https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/connector/file/src/FileSource.html
或者,更好的是,您可以直接使用 SQL 定义一个 table 作为源来摄取输入文件,如下所述:https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/filesystem/