使用 Apache Flink 的计划任务

Scheduled Task with Apache Flink

我有一个并行度为 5 的 flink 作业(现在!!)。 richFlatMap 流之一在 open(Configuration parameters) 方法中打开一个文件。在 flatMap 操作中没有任何打开操作,它只是读取文件来搜索一些东西。 (有一个实用程序 class,它有一个类似于 utilityClass.searchText('abc') 的方法)。这是样板代码:

public class MyFlatMap extends RichFlatMapFunction<...> {

    private MyUtilityFile myFile;

    @Override
    public void open(Configuration parameters) throws Exception {
        myFile.Open("fileLocation");
    }

    @Override
    public void flatMap(...) throws Exception {
        String text = myFile.searchText('abc');
        if (text != null) // take an action
        else // another action
    }

}

此文件每天在特定时间由 python 脚本更新。因此,我还应该在 flatMap 流中打开新创建的文件(通过 python 脚本)。

我只是认为这可以通过 ScheduledExecutorService 只有一个线程池来完成。

I can not open this file every flatMap calls because it is big.

这是我要编写的样板代码:

public class MyFlatMap extends RichFlatMapFunction<...> implements Runnable {
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    private MyUtilityFile myFile;

    @Override
    public void run() {
       myFile.Open("fileLocation");     
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);

        myFile.Open("fileLocation");
    }

    @Override
    public void flatMap(...) throws Exception {
        String text = myFile.searchText('abc');
        if (text != null) // take an action
        else // another action
    }

}

这个样板文件适合 Flink 环境吗?如果没有,我怎样才能按预定方式打开文件? (没有选项比如“after update file send event with kafka and read event by flink”)

或许可以直接实现ProcessingTimeCallback接口,支持定时器操作

public class MyFlatMap extends RichFlatMapFunction<...> implements ProcessingTimeCallback { 
    private MyUtilityFile myFile;

 
    @Override
    public void open(Configuration parameters) throws Exception {
        scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);

        final long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + 3600000, this);

        myFile.Open("fileLocation");
    }

    @Override
    public void flatMap(...) throws Exception {
        String text = myFile.searchText('abc');
        if (text != null) // take an action
        else // another action
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        myFile.Open("fileLocation");

        final long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + 3600000, this);
    }
}