使用 Apache Flink 的计划任务
Scheduled Task with Apache Flink
我有一个并行度为 5 的 flink 作业(现在!!)。 richFlatMap 流之一在 open(Configuration parameters)
方法中打开一个文件。在 flatMap
操作中没有任何打开操作,它只是读取文件来搜索一些东西。 (有一个实用程序 class,它有一个类似于 utilityClass.searchText('abc')
的方法)。这是样板代码:
public class MyFlatMap extends RichFlatMapFunction<...> {
private MyUtilityFile myFile;
@Override
public void open(Configuration parameters) throws Exception {
myFile.Open("fileLocation");
}
@Override
public void flatMap(...) throws Exception {
String text = myFile.searchText('abc');
if (text != null) // take an action
else // another action
}
}
此文件每天在特定时间由 python 脚本更新。因此,我还应该在 flatMap 流中打开新创建的文件(通过 python 脚本)。
我只是认为这可以通过 ScheduledExecutorService
只有一个线程池来完成。
I can not open this file every flatMap calls because it is big.
这是我要编写的样板代码:
public class MyFlatMap extends RichFlatMapFunction<...> implements Runnable {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private MyUtilityFile myFile;
@Override
public void run() {
myFile.Open("fileLocation");
}
@Override
public void open(Configuration parameters) throws Exception {
scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);
myFile.Open("fileLocation");
}
@Override
public void flatMap(...) throws Exception {
String text = myFile.searchText('abc');
if (text != null) // take an action
else // another action
}
}
这个样板文件适合 Flink 环境吗?如果没有,我怎样才能按预定方式打开文件? (没有选项比如“after update file send event with kafka and read event by flink”)
或许可以直接实现ProcessingTimeCallback接口,支持定时器操作
public class MyFlatMap extends RichFlatMapFunction<...> implements ProcessingTimeCallback {
private MyUtilityFile myFile;
@Override
public void open(Configuration parameters) throws Exception {
scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + 3600000, this);
myFile.Open("fileLocation");
}
@Override
public void flatMap(...) throws Exception {
String text = myFile.searchText('abc');
if (text != null) // take an action
else // another action
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
myFile.Open("fileLocation");
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + 3600000, this);
}
}
我有一个并行度为 5 的 flink 作业(现在!!)。 richFlatMap 流之一在 open(Configuration parameters)
方法中打开一个文件。在 flatMap
操作中没有任何打开操作,它只是读取文件来搜索一些东西。 (有一个实用程序 class,它有一个类似于 utilityClass.searchText('abc')
的方法)。这是样板代码:
public class MyFlatMap extends RichFlatMapFunction<...> {
private MyUtilityFile myFile;
@Override
public void open(Configuration parameters) throws Exception {
myFile.Open("fileLocation");
}
@Override
public void flatMap(...) throws Exception {
String text = myFile.searchText('abc');
if (text != null) // take an action
else // another action
}
}
此文件每天在特定时间由 python 脚本更新。因此,我还应该在 flatMap 流中打开新创建的文件(通过 python 脚本)。
我只是认为这可以通过 ScheduledExecutorService
只有一个线程池来完成。
I can not open this file every flatMap calls because it is big.
这是我要编写的样板代码:
public class MyFlatMap extends RichFlatMapFunction<...> implements Runnable {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private MyUtilityFile myFile;
@Override
public void run() {
myFile.Open("fileLocation");
}
@Override
public void open(Configuration parameters) throws Exception {
scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);
myFile.Open("fileLocation");
}
@Override
public void flatMap(...) throws Exception {
String text = myFile.searchText('abc');
if (text != null) // take an action
else // another action
}
}
这个样板文件适合 Flink 环境吗?如果没有,我怎样才能按预定方式打开文件? (没有选项比如“after update file send event with kafka and read event by flink”)
或许可以直接实现ProcessingTimeCallback接口,支持定时器操作
public class MyFlatMap extends RichFlatMapFunction<...> implements ProcessingTimeCallback {
private MyUtilityFile myFile;
@Override
public void open(Configuration parameters) throws Exception {
scheduler.scheduleAtFixedRate(this, 1, 1, TimeUnit.HOURS);
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + 3600000, this);
myFile.Open("fileLocation");
}
@Override
public void flatMap(...) throws Exception {
String text = myFile.searchText('abc');
if (text != null) // take an action
else // another action
}
@Override
public void onProcessingTime(long timestamp) throws Exception {
myFile.Open("fileLocation");
final long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + 3600000, this);
}
}