Flink 从 List<String> filePaths 转发文件

Flink forward files from List<String> filePaths

我们有来自数据库 table 的文件路径列表,其中包含创建时的时间戳。试图弄清楚我们如何使用 db 中的文件路径列表仅将那些文件从 nfs 转发到 kafka 接收器。

现在我正在使用 ContinuousFileMonitoringFunction 的自定义版本,文件夹的根目录将包含 DB 将显示的所有文件。此操作非常慢,因为要遍历文件夹以收集有关更新文件的信息,因为文件夹太大,数据很少。

Table orders = tableEnv.from("Customers");
Table result = orders.where($("b").isEqual("****"));

DataSet<String> ds  = result.toDataSet();

ds有所有应该发送到kafka的文件路径。

以下是我打算实现的想法。但是考虑到 flink 并行性、flink 库支持等,是否有更有效的方法?

public class FileContentMap extends RichFlatMapFunction<String, String> {

      

    @Override
    public void flatMap(String input, Collector<String> out) throws Exception {

       
       
        // get the file path
        String filePath = input;

        String fileContent = readFile(input);

    out.collect(fileCOntent);

       
    }

    @Override
    public void open(Configuration config) {
       
    }
}

DataSet<String> contectDataSet = ds.map(new FileCOntentMap());

contectDataSet.addSink(kafkaProducer);

我觉得你的方法不错。也许更有效的方法是创建一个 RichParallelSourceFunction,在 open() 方法中调用数据库以获取已更新的文件列表,然后构建一个 in-memory特定来源 sub-task(类似于 filePath.hashCode() % numSubTasks == mySubTask)应该发出的文件列表,供您的 FileContentMap.

处理