如何在流数据流程序运行时更改运算符?
How to change the operators at runtime of a Stream Dataflow program?
我想知道是否可以更改已经提交给Flink的作业的操作员。假设我有一个字数统计程序,并且有一个过滤器只计算大于 3 个字符的字数。我想在运行时更改此过滤器的参数。我的第一个猜测是 Flink(以及其他数据流引擎 Spark、Storm、Apache Edgent)无法做到这一点,因为作业已经在 env.execute()
上提交。有谁知道这样做的方法吗?
我想这个问题(Deploy stream processing topology on runtime?)与我想要的有关,但解决方案仍然不是我想要的动态。
谢谢
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9000)
.flatMap(new SplitterFlatMap()).keyBy(0)
.sum(1)
.filter(word -> word.f1 >= 3);
dataStream.print();
env.execute("WordCountSocketFilterQEP");
我想在 Flink 中我可以使用 CoFlatMapFunction
-> 。但是在 Apache Edgent 中,我不确定是否有办法做到这一点……
这是我的实现>
package org.sense.flink.examples.stream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import org.sense.flink.mqtt.FlinkMqttConsumer;
import org.sense.flink.mqtt.MqttMessage;
public class SensorsDynamicFilterMqttEdgentQEP {
public SensorsDynamicFilterMqttEdgentQEP() throws Exception {
// Start streaming from fake data source sensors
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// obtain execution environment, run this example in "ingestion time"
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<MqttMessage> temperatureStream = env.addSource(new FlinkMqttConsumer("topic-edgent"));
DataStream<Tuple2<Double, Double>> parameterStream = env.addSource(new FlinkMqttConsumer("topic-parameter"))
.map(new ParameterMapper());
DataStream<MqttMessage> filteredStream = temperatureStream.connect(parameterStream.broadcast())
.flatMap(new DynamicFilterCoFlatMapper());
filteredStream.print();
String executionPlan = env.getExecutionPlan();
System.out.println("ExecutionPlan ........................ ");
System.out.println(executionPlan);
System.out.println("........................ ");
env.execute("SensorsDynamicFilterMqttEdgentQEP");
}
public static class DynamicFilterCoFlatMapper
implements CoFlatMapFunction<MqttMessage, Tuple2<Double, Double>, MqttMessage> {
private static final long serialVersionUID = -8634404029870404558L;
private Tuple2<Double, Double> range = new Tuple2<Double, Double>(-1000.0, 1000.0);
@Override
public void flatMap1(MqttMessage value, Collector<MqttMessage> out) throws Exception {
double payload = Double.parseDouble(value.getPayload());
if (payload >= this.range.f0 && payload <= this.range.f1) {
out.collect(value);
}
}
@Override
public void flatMap2(Tuple2<Double, Double> value, Collector<MqttMessage> out) throws Exception {
this.range = value;
}
}
public static class ParameterMapper implements MapFunction<MqttMessage, Tuple2<Double, Double>> {
private static final long serialVersionUID = 7322348505833012711L;
@Override
public Tuple2<Double, Double> map(MqttMessage value) throws Exception {
String[] array = value.getPayload().split(",");
double min = Double.parseDouble(array[0]);
double max = Double.parseDouble(array[1]);
return new Tuple2<Double, Double>(min, max);
}
}
}
使用 Flink,您可以将广播流连接到键控流,并在您要使用的参数或代码中进行广播。 TaxiQuery 是将 Janino 与 Java 表达式结合使用的示例,但您可能可以动态加载 class。我还看到这是用 Rhino/Javascript、JRuby 等
完成的
为了将您的 parameterStream
的值发送给所有运算符,您必须使用 BroadcastStream
。请注意(从 Flink 1.6 开始?)这还允许您保持广播状态,其中 "rules" 或您发送到 DynamicFilterCoFlatMapper
所有实例的配置设置将自动保存为状态。
我想知道是否可以更改已经提交给Flink的作业的操作员。假设我有一个字数统计程序,并且有一个过滤器只计算大于 3 个字符的字数。我想在运行时更改此过滤器的参数。我的第一个猜测是 Flink(以及其他数据流引擎 Spark、Storm、Apache Edgent)无法做到这一点,因为作业已经在 env.execute()
上提交。有谁知道这样做的方法吗?
我想这个问题(Deploy stream processing topology on runtime?)与我想要的有关,但解决方案仍然不是我想要的动态。
谢谢
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9000)
.flatMap(new SplitterFlatMap()).keyBy(0)
.sum(1)
.filter(word -> word.f1 >= 3);
dataStream.print();
env.execute("WordCountSocketFilterQEP");
我想在 Flink 中我可以使用 CoFlatMapFunction
->
package org.sense.flink.examples.stream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import org.sense.flink.mqtt.FlinkMqttConsumer;
import org.sense.flink.mqtt.MqttMessage;
public class SensorsDynamicFilterMqttEdgentQEP {
public SensorsDynamicFilterMqttEdgentQEP() throws Exception {
// Start streaming from fake data source sensors
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// obtain execution environment, run this example in "ingestion time"
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<MqttMessage> temperatureStream = env.addSource(new FlinkMqttConsumer("topic-edgent"));
DataStream<Tuple2<Double, Double>> parameterStream = env.addSource(new FlinkMqttConsumer("topic-parameter"))
.map(new ParameterMapper());
DataStream<MqttMessage> filteredStream = temperatureStream.connect(parameterStream.broadcast())
.flatMap(new DynamicFilterCoFlatMapper());
filteredStream.print();
String executionPlan = env.getExecutionPlan();
System.out.println("ExecutionPlan ........................ ");
System.out.println(executionPlan);
System.out.println("........................ ");
env.execute("SensorsDynamicFilterMqttEdgentQEP");
}
public static class DynamicFilterCoFlatMapper
implements CoFlatMapFunction<MqttMessage, Tuple2<Double, Double>, MqttMessage> {
private static final long serialVersionUID = -8634404029870404558L;
private Tuple2<Double, Double> range = new Tuple2<Double, Double>(-1000.0, 1000.0);
@Override
public void flatMap1(MqttMessage value, Collector<MqttMessage> out) throws Exception {
double payload = Double.parseDouble(value.getPayload());
if (payload >= this.range.f0 && payload <= this.range.f1) {
out.collect(value);
}
}
@Override
public void flatMap2(Tuple2<Double, Double> value, Collector<MqttMessage> out) throws Exception {
this.range = value;
}
}
public static class ParameterMapper implements MapFunction<MqttMessage, Tuple2<Double, Double>> {
private static final long serialVersionUID = 7322348505833012711L;
@Override
public Tuple2<Double, Double> map(MqttMessage value) throws Exception {
String[] array = value.getPayload().split(",");
double min = Double.parseDouble(array[0]);
double max = Double.parseDouble(array[1]);
return new Tuple2<Double, Double>(min, max);
}
}
}
使用 Flink,您可以将广播流连接到键控流,并在您要使用的参数或代码中进行广播。 TaxiQuery 是将 Janino 与 Java 表达式结合使用的示例,但您可能可以动态加载 class。我还看到这是用 Rhino/Javascript、JRuby 等
完成的为了将您的 parameterStream
的值发送给所有运算符,您必须使用 BroadcastStream
。请注意(从 Flink 1.6 开始?)这还允许您保持广播状态,其中 "rules" 或您发送到 DynamicFilterCoFlatMapper
所有实例的配置设置将自动保存为状态。