Apache Flink 批处理模式在几分钟后失败并打印结果

Apache Flink batch mode fails after a few minutes and prints the results

我正在使用 Apache Flink 读取 CSV 文件,然后将记录转换为 table,我从中执行 SQL 查询并将结果打印到标准输出。

代码(简体):

 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 env.setRuntimeMode(RuntimeExecutionMode.BATCH);

 rowDataStreamSource = env.readFile(...).disableChaining();

 final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 final Table table = tableEnv.fromChangelogStream(rowDataStreamSource, ordersSchema, ChangelogMode.insertOnly());
 tableEnv.createTemporaryView("orders", table);

 Table originalSQL = tableEnv.sqlQuery(...)
 originalSQL.execute().print();

 env.execute();

在 运行 完成这项工作后几分钟后出现错误:

21:29:49.999 [deploy.instance_IS_UNDEFINED,,,][mini-cluster-io-thread-3] INFO o.a.f.r.rpc.akka.AkkaRpcService - Stopping Akka RPC service. 21:29:50.066 [deploy.instance_IS_UNDEFINED,,,][http-nio-8080-exec-1] WARN o.a.f.s.a.o.c.CollectResultFetcher - Failed to get job status so we assume that the job has terminated. Some data might be lost. java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:877)

然后 table 和我的 sql 结果打印在异常下方。

这里的问题是什么原因造成的?有没有办法告诉 Flink 没有更多记录以便它可以完成作业并打印结果?

您应该使用 FileSource 而不是 readFile 以便在批处理模式下正常工作:https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/connector/file/src/FileSource.html

或者,更好的是,您可以直接使用 SQL 定义一个 table 作为源来摄取输入文件,如下所述:https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/filesystem/