以连续的方式从自定义源写入数据到 flink
Write data from custom source to flink in continuous way
第一次使用Apache Flink(1.3.1),有疑问。更详细地说,我正在使用 flink-core、flink-cep 和 flink-streaming 库。我的应用程序是一个 Akka ActorSystem,它使用来自 RabbitMQ 的消息,各种参与者处理这些消息。在一些演员中,我想从 Flink 实例化一个 StreamExecutionEnvironment
并处理传入的消息。因此我写了一个自定义源代码 class 扩展了 RichSourceFunction
class。发现一切正常,除了一件事:我不知道如何将数据发送到我的 Flink 扩展。这是我的设置:
public class FlinkExtension {
private static StreamExecutionEnvironment environment;
private DataStream<ValueEvent> input;
private CustomSourceFunction function;
public FlinkExtension(){
environment = StreamExecutionEnvironment.getExecutionEnvironment();
function = new CustomSourceFunction();
input = environment.addSource(function);
PatternStream<ValueEvent> patternStream = CEP.pattern(input, _pattern());
DataStream<String> warnings = patternStream.select(new PatternSelectFunction<ValueEvent, String>() {
@Override
public String select(Map<String, List<ValueEvent>> pattern) throws Exception {
return null; //TODO
}
});
warnings.print();
try {
environment.execute();
} catch(Exception e){
e.printStackTrace();
}
}
private Pattern<ValueEvent, ?> _pattern(){
return Pattern.<ValueEvent>begin("first").where(new SimpleCondition<ValueEvent>() {
@Override
public boolean filter(ValueEvent value) throws Exception {
return value.getValue() > 10;
}
});
}
public void sendData(ValueEvent value){
function.sendData(value);
}
}
这是我的自定义源函数:
public class CustomSourceFunction extends RichSourceFunction<ValueEvent> {
private volatile boolean run = false;
private SourceContext<ValueEvent> context;
@Override
public void open(Configuration parameters){
run = true;
}
@Override
public void run(SourceContext<ValueEvent> ctx) throws Exception {
this.context = ctx;
while (run){
}
}
public void sendData(ValueEvent value){
this.context.collectWithTimestamp(value, Calendar.getInstance().getTimeInMillis());
}
@Override
public void cancel() {
run = false;
}
}
所以我想从外部调用我的FlinkExtension
class中的方法sendData
,以连续的方式向我的FlinkExtension
写入数据。这是我的 JUnit 测试应该将数据发送到扩展然后将数据写入 SourceContext
。
@Test
public void testSendData(){
FlinkExtension extension = new FlinkExtension();
extension.sendData(new ValueEvent(30));
}
但是如果我运行测试,没有任何反应,应用程序在CustomSourceFunction
的运行方法中挂起。我还尝试在 CustomSourceFunction
运行 方法中创建一个新的无限线程。
总结一下:有人知道如何以连续的方式将数据从应用程序写入 Flink 实例吗?
Flink 源连接器通过其 运行() 方法在 while(运行) 循环中调用 collect()(或 collectWithTimestamp())来发出连续的数据流。如果你想研究一个例子,Apache NiFi 源并不像大多数的那么复杂; here's its run method.
问题是 CustomSourceFunction
方法和 sendData
方法使用了不同的对象实例。因此,context
对象未在方法之间共享,并且添加新的 ValueEvent
不起作用。
要解决此问题,请将 run
方法使用的对象实例存储为 CustomSourceFunction
class 的静态成员变量。当您需要创建一个新的 ValueEvent
时,请在同一对象实例上调用 sendData
方法。
查看下面的示例代码
package RuleSources;
import Rules.Rule;
import org.apache.flink.streaming.api.watermark.Watermark;
import java.util.ArrayList;
public class DynamicRuleSource extends AlertingRuleSource {
private static DynamicRuleSource sourceObj;
private SourceContext<Rule> ctx;
public static DynamicRuleSource getSourceObject() {
return sourceObj;
}
public void run(SourceContext<Rule> ctx) throws Exception {
this.ctx = ctx;
sourceObj = this;
while(true) {
Thread.sleep(100);
}
}
public void addRule(Rule rule) {
ctx.collect(rule);
}
@Override
public void cancel() {
}
}
添加新规则
public static void addRule(Rule rule) throws Exception {
AlertingRuleSource sourceObject = DynamicRuleSource.getSourceObject();
sourceObject.addRule(rule);
}
第一次使用Apache Flink(1.3.1),有疑问。更详细地说,我正在使用 flink-core、flink-cep 和 flink-streaming 库。我的应用程序是一个 Akka ActorSystem,它使用来自 RabbitMQ 的消息,各种参与者处理这些消息。在一些演员中,我想从 Flink 实例化一个 StreamExecutionEnvironment
并处理传入的消息。因此我写了一个自定义源代码 class 扩展了 RichSourceFunction
class。发现一切正常,除了一件事:我不知道如何将数据发送到我的 Flink 扩展。这是我的设置:
public class FlinkExtension {
private static StreamExecutionEnvironment environment;
private DataStream<ValueEvent> input;
private CustomSourceFunction function;
public FlinkExtension(){
environment = StreamExecutionEnvironment.getExecutionEnvironment();
function = new CustomSourceFunction();
input = environment.addSource(function);
PatternStream<ValueEvent> patternStream = CEP.pattern(input, _pattern());
DataStream<String> warnings = patternStream.select(new PatternSelectFunction<ValueEvent, String>() {
@Override
public String select(Map<String, List<ValueEvent>> pattern) throws Exception {
return null; //TODO
}
});
warnings.print();
try {
environment.execute();
} catch(Exception e){
e.printStackTrace();
}
}
private Pattern<ValueEvent, ?> _pattern(){
return Pattern.<ValueEvent>begin("first").where(new SimpleCondition<ValueEvent>() {
@Override
public boolean filter(ValueEvent value) throws Exception {
return value.getValue() > 10;
}
});
}
public void sendData(ValueEvent value){
function.sendData(value);
}
}
这是我的自定义源函数:
public class CustomSourceFunction extends RichSourceFunction<ValueEvent> {
private volatile boolean run = false;
private SourceContext<ValueEvent> context;
@Override
public void open(Configuration parameters){
run = true;
}
@Override
public void run(SourceContext<ValueEvent> ctx) throws Exception {
this.context = ctx;
while (run){
}
}
public void sendData(ValueEvent value){
this.context.collectWithTimestamp(value, Calendar.getInstance().getTimeInMillis());
}
@Override
public void cancel() {
run = false;
}
}
所以我想从外部调用我的FlinkExtension
class中的方法sendData
,以连续的方式向我的FlinkExtension
写入数据。这是我的 JUnit 测试应该将数据发送到扩展然后将数据写入 SourceContext
。
@Test
public void testSendData(){
FlinkExtension extension = new FlinkExtension();
extension.sendData(new ValueEvent(30));
}
但是如果我运行测试,没有任何反应,应用程序在CustomSourceFunction
的运行方法中挂起。我还尝试在 CustomSourceFunction
运行 方法中创建一个新的无限线程。
总结一下:有人知道如何以连续的方式将数据从应用程序写入 Flink 实例吗?
Flink 源连接器通过其 运行() 方法在 while(运行) 循环中调用 collect()(或 collectWithTimestamp())来发出连续的数据流。如果你想研究一个例子,Apache NiFi 源并不像大多数的那么复杂; here's its run method.
问题是 CustomSourceFunction
方法和 sendData
方法使用了不同的对象实例。因此,context
对象未在方法之间共享,并且添加新的 ValueEvent
不起作用。
要解决此问题,请将 run
方法使用的对象实例存储为 CustomSourceFunction
class 的静态成员变量。当您需要创建一个新的 ValueEvent
时,请在同一对象实例上调用 sendData
方法。
查看下面的示例代码
package RuleSources;
import Rules.Rule;
import org.apache.flink.streaming.api.watermark.Watermark;
import java.util.ArrayList;
public class DynamicRuleSource extends AlertingRuleSource {
private static DynamicRuleSource sourceObj;
private SourceContext<Rule> ctx;
public static DynamicRuleSource getSourceObject() {
return sourceObj;
}
public void run(SourceContext<Rule> ctx) throws Exception {
this.ctx = ctx;
sourceObj = this;
while(true) {
Thread.sleep(100);
}
}
public void addRule(Rule rule) {
ctx.collect(rule);
}
@Override
public void cancel() {
}
}
添加新规则
public static void addRule(Rule rule) throws Exception {
AlertingRuleSource sourceObject = DynamicRuleSource.getSourceObject();
sourceObject.addRule(rule);
}