Apache Flink:如何使用 SourceFunction 以指定的时间间隔执行任务?

Apache Flink: how to use SourceFunction to execute a task at specified interval?

我需要我的 flink 作业以指定的时间间隔从数据库中提取记录并在处理后存档。我已经实现了 SourceFunction 以从数据库中获取所需的记录,并将 SourceFunction 添加为 StreamExecutionEnvironment 的源。如何指定 StreamExecutionEnvironment 需要每 10 分钟使用 SourceFunction 从数据库中获取记录?

源函数:

public class MongoDBSourceFunction implements SourceFunction<List<Book>>{

    public void cancel() {
        // TODO Auto-generated method stub
    }

    public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
<List<Book>> context) throws Exception {

        List<Book> books = getBooks();

        context.collect(books);

    }

    public List<Book> getBooks() {
        List<Book> books = new ArrayList<Book>();

        //fetch all books from database     
        return books;
    }

}

处理器:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ArchiveJob {

    public static void main(String[] args) {

        final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new MongoDBSourceFunction()).print();
    }

}

您需要将此功能添加到 MongoDBSourceFunction 本身。例如,您可以在 open 方法中实例化一个 ScheduledExecutorService 并使用此执行程序安排读取任务。

请注意,在发出记录时保持检查点锁很重要。