为什么 flink 会停止我的流应用程序?
why flink stops my stream application?
我的代码使用 readTextFile 读取日志文件,当我 运行 Flink (/opt/flink-1.0.3/bin/flink run -m yarn-cluster -yn 2 /home/flink/flink-json-0.1.jar
) 中的 jar 时,它成功处理了里面的行并停止了我的应用程序,而不是等待新的线。
我需要一些参数吗?
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("hdfs:///test/ignicion.io")
提前致谢
您正在寻找
StreamExecutionEnvironment.readFileStream(String filePath, long intervalMillis, WatchType watchType)
对于 WatchType,您有以下选项
- ONLY_NEW_FILES,
- REPROCESS_WITH_APPENDED,
- PROCESS_ONLY_APPENDED;
来自
的流
StreamExecutionEnvironment.readTextFile(String filePath, String charsetName)
读取所有文件后完成。我想,主要是为了开发时的本地测试。
我的代码使用 readTextFile 读取日志文件,当我 运行 Flink (/opt/flink-1.0.3/bin/flink run -m yarn-cluster -yn 2 /home/flink/flink-json-0.1.jar
) 中的 jar 时,它成功处理了里面的行并停止了我的应用程序,而不是等待新的线。
我需要一些参数吗?
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("hdfs:///test/ignicion.io")
提前致谢
您正在寻找
StreamExecutionEnvironment.readFileStream(String filePath, long intervalMillis, WatchType watchType)
对于 WatchType,您有以下选项
- ONLY_NEW_FILES,
- REPROCESS_WITH_APPENDED,
- PROCESS_ONLY_APPENDED;
来自
的流StreamExecutionEnvironment.readTextFile(String filePath, String charsetName)
读取所有文件后完成。我想,主要是为了开发时的本地测试。