Apache Flink:如何使用 SourceFunction 以指定的时间间隔执行任务?
Apache Flink: how to use SourceFunction to execute a task at specified interval?
我需要我的 flink 作业以指定的时间间隔从数据库中提取记录并在处理后存档。我已经实现了 SourceFunction 以从数据库中获取所需的记录,并将 SourceFunction 添加为 StreamExecutionEnvironment 的源。如何指定 StreamExecutionEnvironment 需要每 10 分钟使用 SourceFunction 从数据库中获取记录?
源函数:
public class MongoDBSourceFunction implements SourceFunction<List<Book>>{
public void cancel() {
// TODO Auto-generated method stub
}
public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
<List<Book>> context) throws Exception {
List<Book> books = getBooks();
context.collect(books);
}
public List<Book> getBooks() {
List<Book> books = new ArrayList<Book>();
//fetch all books from database
return books;
}
}
处理器:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ArchiveJob {
public static void main(String[] args) {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new MongoDBSourceFunction()).print();
}
}
您需要将此功能添加到 MongoDBSourceFunction
本身。例如,您可以在 open
方法中实例化一个 ScheduledExecutorService
并使用此执行程序安排读取任务。
请注意,在发出记录时保持检查点锁很重要。
我需要我的 flink 作业以指定的时间间隔从数据库中提取记录并在处理后存档。我已经实现了 SourceFunction 以从数据库中获取所需的记录,并将 SourceFunction 添加为 StreamExecutionEnvironment 的源。如何指定 StreamExecutionEnvironment 需要每 10 分钟使用 SourceFunction 从数据库中获取记录?
源函数:
public class MongoDBSourceFunction implements SourceFunction<List<Book>>{
public void cancel() {
// TODO Auto-generated method stub
}
public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
<List<Book>> context) throws Exception {
List<Book> books = getBooks();
context.collect(books);
}
public List<Book> getBooks() {
List<Book> books = new ArrayList<Book>();
//fetch all books from database
return books;
}
}
处理器:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ArchiveJob {
public static void main(String[] args) {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new MongoDBSourceFunction()).print();
}
}
您需要将此功能添加到 MongoDBSourceFunction
本身。例如,您可以在 open
方法中实例化一个 ScheduledExecutorService
并使用此执行程序安排读取任务。
请注意,在发出记录时保持检查点锁很重要。