无法在 FileProcessing.PROCESS_CONTINUOS 模式下读取(文本)文件

Unable to read a (text)file in FileProcessing.PROCESS_CONTINUOS mode

我有从特定路径连续读取文件的需求。

意味着 flink 作业应该不断地轮询指定位置并读取一个文件,该文件将以一定的间隔到达该位置。

示例:我在 windows 机器上的位置是 C:/inputfiles 在 2:00PM、file_2.txt 在 2:30PM、file_3.txt 在 3:00PM.

我用下面的代码进行了试验。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;

public class ContinuousFileProcessingTest {

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(10);
    String localFsURI = "D:\FLink\2021_01_01\";
    TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
    format.setFilesFilter(FilePathFilter.createDefaultFilter());
    DataStream<String> inputStream =
            env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
    SingleOutputStreamOperator<String> soso = inputStream.map(String::toUpperCase);
    soso.print();
    soso.writeAsText("D:\FLink\completed", FileSystem.WriteMode.OVERWRITE);
    env.execute("read and write");
}
}

现在为了在 flink 集群上测试这个,我使用 flink 的 1.9.2 版本启动了 flink 集群,我能够实现我的目标,即每隔一段时间连续读取文件。

注意:Flink 的 1.9.2 版本可以在 windows 机器上启动集群。

但现在我必须将 flink 的版本从 1.9.2 升级到 1.12。我们使用 docker 在 1.12 上启动集群(与 1.9.2 不同)。

与 windows 路径不同,我根据 docker 位置更改了文件位置,但上面的相同程序不在 运行 那里。

此外: 访问文件不是 problem.Means 如果我在开始作业之前放置文件,则此作业会正确读取这些文件,但如果我在运行时添加任何新文件,则它不会读取这个新添加的文件。

需要帮助找到解决方案。

提前致谢。

尝试将示例代码中的 directoryScanInterval 减少到 Duration.ofSeconds(50).toMillis() 并检查 StreamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC) 模式。

对于从 https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/RuntimeExecutionMode.html

引用的 RuntimeExecutionMode

工作代码如下:

public class ContinuousFileProcessingTest { 私有静态最终记录器日志 = LoggerFactory.getLogger(ReadSpecificFilesFlinkBatch.class);

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(10);
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    String localFsURI = "file:///usr/test";
    // create the monitoring source along with the necessary readers.
    TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
    log.info("format : " + format.toString());
    format.setFilesFilter(FilePathFilter.createDefaultFilter());
    log.info("setFilesFilter : " + FilePathFilter.createDefaultFilter().toString());
    log.info("getFilesFilter : " + format.getFilePath().toString());
    DataStream<String> inputStream =
            env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY,  Duration.ofSeconds(50).toMillis());
    SingleOutputStreamOperator<String> soso = inputStream.map(String::toUpperCase);
    soso.writeAsText("file:///usr/test/completed.txt", FileSystem.WriteMode.OVERWRITE);
    env.execute("read and write");
}        

}

此代码适用于 docker Flink 1.12 桌面和容器文件路径为 file:///usr/test。注意保持并行度至少为 2,以便可以并行处理文件。