无法在 FileProcessing.PROCESS_CONTINUOS 模式下读取(文本)文件
Unable to read a (text)file in FileProcessing.PROCESS_CONTINUOS mode
我有从特定路径连续读取文件的需求。
意味着 flink 作业应该不断地轮询指定位置并读取一个文件,该文件将以一定的间隔到达该位置。
示例:我在 windows 机器上的位置是 C:/inputfiles 在 2:00PM、file_2.txt 在 2:30PM、file_3.txt 在 3:00PM.
我用下面的代码进行了试验。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.List;
public class ContinuousFileProcessingTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10);
String localFsURI = "D:\FLink\2021_01_01\";
TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
DataStream<String> inputStream =
env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
SingleOutputStreamOperator<String> soso = inputStream.map(String::toUpperCase);
soso.print();
soso.writeAsText("D:\FLink\completed", FileSystem.WriteMode.OVERWRITE);
env.execute("read and write");
}
}
现在为了在 flink 集群上测试这个,我使用 flink 的 1.9.2 版本启动了 flink 集群,我能够实现我的目标,即每隔一段时间连续读取文件。
注意:Flink 的 1.9.2 版本可以在 windows 机器上启动集群。
但现在我必须将 flink 的版本从 1.9.2 升级到 1.12。我们使用 docker 在 1.12 上启动集群(与 1.9.2 不同)。
与 windows 路径不同,我根据 docker 位置更改了文件位置,但上面的相同程序不在 运行 那里。
此外:
访问文件不是 problem.Means 如果我在开始作业之前放置文件,则此作业会正确读取这些文件,但如果我在运行时添加任何新文件,则它不会读取这个新添加的文件。
需要帮助找到解决方案。
提前致谢。
尝试将示例代码中的 directoryScanInterval 减少到 Duration.ofSeconds(50).toMillis() 并检查 StreamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC) 模式。
引用的 RuntimeExecutionMode
工作代码如下:
public class ContinuousFileProcessingTest {
私有静态最终记录器日志 = LoggerFactory.getLogger(ReadSpecificFilesFlinkBatch.class);
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
String localFsURI = "file:///usr/test";
// create the monitoring source along with the necessary readers.
TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
log.info("format : " + format.toString());
format.setFilesFilter(FilePathFilter.createDefaultFilter());
log.info("setFilesFilter : " + FilePathFilter.createDefaultFilter().toString());
log.info("getFilesFilter : " + format.getFilePath().toString());
DataStream<String> inputStream =
env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, Duration.ofSeconds(50).toMillis());
SingleOutputStreamOperator<String> soso = inputStream.map(String::toUpperCase);
soso.writeAsText("file:///usr/test/completed.txt", FileSystem.WriteMode.OVERWRITE);
env.execute("read and write");
}
}
此代码适用于 docker Flink 1.12 桌面和容器文件路径为 file:///usr/test。注意保持并行度至少为 2,以便可以并行处理文件。
我有从特定路径连续读取文件的需求。
意味着 flink 作业应该不断地轮询指定位置并读取一个文件,该文件将以一定的间隔到达该位置。
示例:我在 windows 机器上的位置是 C:/inputfiles 在 2:00PM、file_2.txt 在 2:30PM、file_3.txt 在 3:00PM.
我用下面的代码进行了试验。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.List;
public class ContinuousFileProcessingTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10);
String localFsURI = "D:\FLink\2021_01_01\";
TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
DataStream<String> inputStream =
env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
SingleOutputStreamOperator<String> soso = inputStream.map(String::toUpperCase);
soso.print();
soso.writeAsText("D:\FLink\completed", FileSystem.WriteMode.OVERWRITE);
env.execute("read and write");
}
}
现在为了在 flink 集群上测试这个,我使用 flink 的 1.9.2 版本启动了 flink 集群,我能够实现我的目标,即每隔一段时间连续读取文件。
注意:Flink 的 1.9.2 版本可以在 windows 机器上启动集群。
但现在我必须将 flink 的版本从 1.9.2 升级到 1.12。我们使用 docker 在 1.12 上启动集群(与 1.9.2 不同)。
与 windows 路径不同,我根据 docker 位置更改了文件位置,但上面的相同程序不在 运行 那里。
此外: 访问文件不是 problem.Means 如果我在开始作业之前放置文件,则此作业会正确读取这些文件,但如果我在运行时添加任何新文件,则它不会读取这个新添加的文件。
需要帮助找到解决方案。
提前致谢。
尝试将示例代码中的 directoryScanInterval 减少到 Duration.ofSeconds(50).toMillis() 并检查 StreamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC) 模式。
工作代码如下:
public class ContinuousFileProcessingTest { 私有静态最终记录器日志 = LoggerFactory.getLogger(ReadSpecificFilesFlinkBatch.class);
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
String localFsURI = "file:///usr/test";
// create the monitoring source along with the necessary readers.
TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
log.info("format : " + format.toString());
format.setFilesFilter(FilePathFilter.createDefaultFilter());
log.info("setFilesFilter : " + FilePathFilter.createDefaultFilter().toString());
log.info("getFilesFilter : " + format.getFilePath().toString());
DataStream<String> inputStream =
env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, Duration.ofSeconds(50).toMillis());
SingleOutputStreamOperator<String> soso = inputStream.map(String::toUpperCase);
soso.writeAsText("file:///usr/test/completed.txt", FileSystem.WriteMode.OVERWRITE);
env.execute("read and write");
}
}
此代码适用于 docker Flink 1.12 桌面和容器文件路径为 file:///usr/test。注意保持并行度至少为 2,以便可以并行处理文件。