如何在 Fink 中实例化 MapStateDescriptor 以计算多个平均流查询?
How to instantiate a MapStateDescriptor in Fink to compute multiple averages stream queries?
我正在尝试计算 3 个不同房间的平均温度,每个房间都有 3 个温度传感器。我正在使用 Flink(Java)。首先,我将传感器按房间(A、B 或 C)的键分开,然后我创建一个 RichFlatMapFunction
,其中包含一个 MapState
以保存温度,而我直到 3 时才拥有测量。三次测量后,我计算平均值。为了使用 MapState
我需要一个我不知道如何正确实例化的 MapStateDescriptor
。有人可以帮我弄这个吗?谢谢。
public class SensorsMultipleReadingMqttEdgentQEP2 {
private boolean checkpointEnable = false;
private long checkpointInterval = 10000;
private CheckpointingMode checkpointMode = CheckpointingMode.EXACTLY_ONCE;
public SensorsMultipleReadingMqttEdgentQEP2() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
if (checkpointEnable) {
env.enableCheckpointing(checkpointInterval, checkpointMode);
}
DataStream<MqttTemperature> temperatureStream01 = env.addSource(new TemperatureMqttConsumer("topic-edgent-01"));
DataStream<MqttTemperature> temperatureStream02 = env.addSource(new TemperatureMqttConsumer("topic-edgent-02"));
DataStream<MqttTemperature> temperatureStream03 = env.addSource(new TemperatureMqttConsumer("topic-edgent-03"));
DataStream<Tuple2<String, Double>> averageStream01 = temperatureStream01.map(new SensorMatcher()).keyBy(0)
.flatMap(new AverageTempMapper());
DataStream<Tuple2<String, Double>> averageStream02 = temperatureStream02.map(new SensorMatcher()).keyBy(0)
.flatMap(new AverageTempMapper());
DataStream<Tuple2<String, Double>> averageStream03 = temperatureStream03.map(new SensorMatcher()).keyBy(0)
.flatMap(new AverageTempMapper());
DataStream<Tuple2<String, Double>> averageStreams = averageStream01.union(averageStream02)
.union(averageStream03);
averageStreams.print();
env.execute("SensorsMultipleReadingMqttEdgentQEP");
}
public static class SensorMatcher implements MapFunction<MqttTemperature, Tuple2<String, MqttTemperature>> {
private static final long serialVersionUID = 7035756567190539683L;
@Override
public Tuple2<String, MqttTemperature> map(MqttTemperature value) throws Exception {
String key = "no-room";
if (value.getId().equals(1) || value.getId().equals(2) || value.getId().equals(3)) {
key = "room-A";
} else if (value.getId().equals(4) || value.getId().equals(5) || value.getId().equals(6)) {
key = "room-B";
} else if (value.getId().equals(7) || value.getId().equals(8) || value.getId().equals(9)) {
key = "room-C";
} else {
System.err.println("Sensor not defined in any room.");
}
return new Tuple2<>(key, value);
}
}
public static class AverageTempMapper
extends RichFlatMapFunction<Tuple2<String, MqttTemperature>, Tuple2<String, Double>> {
private static final long serialVersionUID = -4780146677198295204L;
private MapState<String, Tuple2<Integer, Double>> modelState;
@Override
public void open(Configuration parameters) throws Exception {
TypeInformation<Tuple2<String, Tuple2<Integer, Double>>> typeInformation = TypeInformation
.of(new TypeHint<Tuple2<String, Tuple2<Integer, Double>>>() {
});
// HOW TO INSTANTIATE THIS descriptor?
MapStateDescriptor<String, Tuple2<Integer, Double>> descriptor = new MapStateDescriptor<>("modelState",
String.class, Tuple2.class);
modelState = getRuntimeContext().getMapState(descriptor);
}
@Override
public void flatMap(Tuple2<String, MqttTemperature> value, Collector<Tuple2<String, Double>> out)
throws Exception {
Double temp = null;
Integer count = 0;
if (modelState.contains(value.f0)) {
count = modelState.get(value.f0).f0 + 1;
temp = (modelState.get(value.f0).f1 + value.f1.getTemp());
} else {
count = 1;
temp = value.f1.getTemp();
}
modelState.put(value.f0, Tuple2.of(count, temp));
if (count >= 3) {
out.collect(Tuple2.of("room", null));
}
}
}
}
我做了一些我认为合理的事情。至少它在工作。如果有人有更好的方法来计算多个平均值,您可以发表评论吗?
public class SensorsMultipleReadingMqttEdgentQEP2 {
private boolean checkpointEnable = false;
private long checkpointInterval = 10000;
private CheckpointingMode checkpointMode = CheckpointingMode.EXACTLY_ONCE;
public SensorsMultipleReadingMqttEdgentQEP2() throws Exception {
// Start streaming from fake data source sensors
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// obtain execution environment, run this example in "ingestion time"
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
if (checkpointEnable) {
env.enableCheckpointing(checkpointInterval, checkpointMode);
}
DataStream<MqttTemperature> temperatureStream01 = env.addSource(new TemperatureMqttConsumer("topic-edgent-01"));
DataStream<MqttTemperature> temperatureStream02 = env.addSource(new TemperatureMqttConsumer("topic-edgent-02"));
DataStream<MqttTemperature> temperatureStream03 = env.addSource(new TemperatureMqttConsumer("topic-edgent-03"));
DataStream<Tuple2<String, Double>> averageStream01 = temperatureStream01.map(new SensorMatcher()).keyBy(0)
.flatMap(new AverageTempMapper());
DataStream<Tuple2<String, Double>> averageStream02 = temperatureStream02.map(new SensorMatcher()).keyBy(0)
.flatMap(new AverageTempMapper());
DataStream<Tuple2<String, Double>> averageStream03 = temperatureStream03.map(new SensorMatcher()).keyBy(0)
.flatMap(new AverageTempMapper());
DataStream<Tuple2<String, Double>> averageStreams = averageStream01.union(averageStream02)
.union(averageStream03);
averageStreams.print();
env.execute("SensorsMultipleReadingMqttEdgentQEP");
}
public static class SensorMatcher implements MapFunction<MqttTemperature, Tuple2<String, MqttTemperature>> {
private static final long serialVersionUID = 7035756567190539683L;
@Override
public Tuple2<String, MqttTemperature> map(MqttTemperature value) throws Exception {
String key = "no-room";
if (value.getId().equals(1) || value.getId().equals(2) || value.getId().equals(3)) {
key = "room-A";
} else if (value.getId().equals(4) || value.getId().equals(5) || value.getId().equals(6)) {
key = "room-B";
} else if (value.getId().equals(7) || value.getId().equals(8) || value.getId().equals(9)) {
key = "room-C";
} else {
System.err.println("Sensor not defined in any room.");
}
return new Tuple2<>(key, value);
}
}
public static class AverageTempMapper
extends RichFlatMapFunction<Tuple2<String, MqttTemperature>, Tuple2<String, Double>> {
private static final long serialVersionUID = -4780146677198295204L;
private MapState<String, Tuple2<Integer, Double>> modelState;
private Integer threshold = 3;
@Override
public void open(Configuration parameters) throws Exception {
TypeInformation<Tuple2<Integer, Double>> typeInformation = TypeInformation
.of(new TypeHint<Tuple2<Integer, Double>>() {
});
MapStateDescriptor<String, Tuple2<Integer, Double>> descriptor = new MapStateDescriptor<String, Tuple2<Integer, Double>>(
"modelState", TypeInformation.of(String.class), typeInformation);
modelState = getRuntimeContext().getMapState(descriptor);
}
@Override
public void flatMap(Tuple2<String, MqttTemperature> value, Collector<Tuple2<String, Double>> out)
throws Exception {
Integer count = 0;
Double temp = 0.0;
if (modelState.contains(value.f0)) {
// there is already a value on the state
count = modelState.get(value.f0).f0 + 1;
temp = modelState.get(value.f0).f1 + value.f1.getTemp();
modelState.put(value.f0, Tuple2.of(1, value.f1.getTemp()));
} else {
// there is no value on the state
count = 1;
temp = value.f1.getTemp();
}
modelState.put(value.f0, Tuple2.of(count, temp));
if (count >= threshold) {
// only compute the average after the threshold
out.collect(Tuple2.of(value.f0, temp / count));
// clear the modelState value in order to compute new values next time
modelState.put(value.f0, Tuple2.of(0, 0.0));
}
}
}
}
为了定义 MapStateDescriptor
您可以执行以下操作:
MapStateDescriptor<String, Tuple2<Integer, Double>> modelState = new MapStateDescriptor<>(
"modelState",
BasicTypeInfo.STRING_TYPE_INFO,
TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Double.class));
this.modelState = getRuntimeContext().getMapState(modelState);
然而,在您的情况下实际上没有必要使用 MapState
。由于流已经被键入,所以使用 ValueState
就足够了。代码将如下所示:
public static class AverageTempMapper extends RichFlatMapFunction<Tuple2<String, MqttTemperature>, Tuple2<String, Double>> {
private static final long serialVersionUID = -4780146677198295204L;
private ValueState<Tuple2<Integer, Double>> modelState;
@Override
public void open(Configuration parameters) {
this.modelState = getRuntimeContext().getState(new ValueStateDescriptor<>("modelState", TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Double.class)));
}
@Override
public void flatMap(Tuple2<String, MqttTemperature> value, Collector<Tuple2<String, Double>> out) throws Exception {
Double temp;
Integer count;
if (modelState.value() != null) {
Tuple2<Integer, Double> state = modelState.value();
count = state.f0 + 1;
temp = state.f1 + value.f1.getTemp();
} else {
count = 1;
temp = value.f1.getTemp();
}
modelState.update(Tuple2.of(count, temp));
if (count >= 3) {
out.collect(Tuple2.of(value.f0, temp/count));
}
}
}
我正在尝试计算 3 个不同房间的平均温度,每个房间都有 3 个温度传感器。我正在使用 Flink(Java)。首先,我将传感器按房间(A、B 或 C)的键分开,然后我创建一个 RichFlatMapFunction
,其中包含一个 MapState
以保存温度,而我直到 3 时才拥有测量。三次测量后,我计算平均值。为了使用 MapState
我需要一个我不知道如何正确实例化的 MapStateDescriptor
。有人可以帮我弄这个吗?谢谢。
public class SensorsMultipleReadingMqttEdgentQEP2 {
private boolean checkpointEnable = false;
private long checkpointInterval = 10000;
private CheckpointingMode checkpointMode = CheckpointingMode.EXACTLY_ONCE;
public SensorsMultipleReadingMqttEdgentQEP2() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
if (checkpointEnable) {
env.enableCheckpointing(checkpointInterval, checkpointMode);
}
DataStream<MqttTemperature> temperatureStream01 = env.addSource(new TemperatureMqttConsumer("topic-edgent-01"));
DataStream<MqttTemperature> temperatureStream02 = env.addSource(new TemperatureMqttConsumer("topic-edgent-02"));
DataStream<MqttTemperature> temperatureStream03 = env.addSource(new TemperatureMqttConsumer("topic-edgent-03"));
DataStream<Tuple2<String, Double>> averageStream01 = temperatureStream01.map(new SensorMatcher()).keyBy(0)
.flatMap(new AverageTempMapper());
DataStream<Tuple2<String, Double>> averageStream02 = temperatureStream02.map(new SensorMatcher()).keyBy(0)
.flatMap(new AverageTempMapper());
DataStream<Tuple2<String, Double>> averageStream03 = temperatureStream03.map(new SensorMatcher()).keyBy(0)
.flatMap(new AverageTempMapper());
DataStream<Tuple2<String, Double>> averageStreams = averageStream01.union(averageStream02)
.union(averageStream03);
averageStreams.print();
env.execute("SensorsMultipleReadingMqttEdgentQEP");
}
public static class SensorMatcher implements MapFunction<MqttTemperature, Tuple2<String, MqttTemperature>> {
private static final long serialVersionUID = 7035756567190539683L;
@Override
public Tuple2<String, MqttTemperature> map(MqttTemperature value) throws Exception {
String key = "no-room";
if (value.getId().equals(1) || value.getId().equals(2) || value.getId().equals(3)) {
key = "room-A";
} else if (value.getId().equals(4) || value.getId().equals(5) || value.getId().equals(6)) {
key = "room-B";
} else if (value.getId().equals(7) || value.getId().equals(8) || value.getId().equals(9)) {
key = "room-C";
} else {
System.err.println("Sensor not defined in any room.");
}
return new Tuple2<>(key, value);
}
}
public static class AverageTempMapper
extends RichFlatMapFunction<Tuple2<String, MqttTemperature>, Tuple2<String, Double>> {
private static final long serialVersionUID = -4780146677198295204L;
private MapState<String, Tuple2<Integer, Double>> modelState;
@Override
public void open(Configuration parameters) throws Exception {
TypeInformation<Tuple2<String, Tuple2<Integer, Double>>> typeInformation = TypeInformation
.of(new TypeHint<Tuple2<String, Tuple2<Integer, Double>>>() {
});
// HOW TO INSTANTIATE THIS descriptor?
MapStateDescriptor<String, Tuple2<Integer, Double>> descriptor = new MapStateDescriptor<>("modelState",
String.class, Tuple2.class);
modelState = getRuntimeContext().getMapState(descriptor);
}
@Override
public void flatMap(Tuple2<String, MqttTemperature> value, Collector<Tuple2<String, Double>> out)
throws Exception {
Double temp = null;
Integer count = 0;
if (modelState.contains(value.f0)) {
count = modelState.get(value.f0).f0 + 1;
temp = (modelState.get(value.f0).f1 + value.f1.getTemp());
} else {
count = 1;
temp = value.f1.getTemp();
}
modelState.put(value.f0, Tuple2.of(count, temp));
if (count >= 3) {
out.collect(Tuple2.of("room", null));
}
}
}
}
我做了一些我认为合理的事情。至少它在工作。如果有人有更好的方法来计算多个平均值,您可以发表评论吗?
public class SensorsMultipleReadingMqttEdgentQEP2 {
private boolean checkpointEnable = false;
private long checkpointInterval = 10000;
private CheckpointingMode checkpointMode = CheckpointingMode.EXACTLY_ONCE;
public SensorsMultipleReadingMqttEdgentQEP2() throws Exception {
// Start streaming from fake data source sensors
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// obtain execution environment, run this example in "ingestion time"
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
if (checkpointEnable) {
env.enableCheckpointing(checkpointInterval, checkpointMode);
}
DataStream<MqttTemperature> temperatureStream01 = env.addSource(new TemperatureMqttConsumer("topic-edgent-01"));
DataStream<MqttTemperature> temperatureStream02 = env.addSource(new TemperatureMqttConsumer("topic-edgent-02"));
DataStream<MqttTemperature> temperatureStream03 = env.addSource(new TemperatureMqttConsumer("topic-edgent-03"));
DataStream<Tuple2<String, Double>> averageStream01 = temperatureStream01.map(new SensorMatcher()).keyBy(0)
.flatMap(new AverageTempMapper());
DataStream<Tuple2<String, Double>> averageStream02 = temperatureStream02.map(new SensorMatcher()).keyBy(0)
.flatMap(new AverageTempMapper());
DataStream<Tuple2<String, Double>> averageStream03 = temperatureStream03.map(new SensorMatcher()).keyBy(0)
.flatMap(new AverageTempMapper());
DataStream<Tuple2<String, Double>> averageStreams = averageStream01.union(averageStream02)
.union(averageStream03);
averageStreams.print();
env.execute("SensorsMultipleReadingMqttEdgentQEP");
}
public static class SensorMatcher implements MapFunction<MqttTemperature, Tuple2<String, MqttTemperature>> {
private static final long serialVersionUID = 7035756567190539683L;
@Override
public Tuple2<String, MqttTemperature> map(MqttTemperature value) throws Exception {
String key = "no-room";
if (value.getId().equals(1) || value.getId().equals(2) || value.getId().equals(3)) {
key = "room-A";
} else if (value.getId().equals(4) || value.getId().equals(5) || value.getId().equals(6)) {
key = "room-B";
} else if (value.getId().equals(7) || value.getId().equals(8) || value.getId().equals(9)) {
key = "room-C";
} else {
System.err.println("Sensor not defined in any room.");
}
return new Tuple2<>(key, value);
}
}
public static class AverageTempMapper
extends RichFlatMapFunction<Tuple2<String, MqttTemperature>, Tuple2<String, Double>> {
private static final long serialVersionUID = -4780146677198295204L;
private MapState<String, Tuple2<Integer, Double>> modelState;
private Integer threshold = 3;
@Override
public void open(Configuration parameters) throws Exception {
TypeInformation<Tuple2<Integer, Double>> typeInformation = TypeInformation
.of(new TypeHint<Tuple2<Integer, Double>>() {
});
MapStateDescriptor<String, Tuple2<Integer, Double>> descriptor = new MapStateDescriptor<String, Tuple2<Integer, Double>>(
"modelState", TypeInformation.of(String.class), typeInformation);
modelState = getRuntimeContext().getMapState(descriptor);
}
@Override
public void flatMap(Tuple2<String, MqttTemperature> value, Collector<Tuple2<String, Double>> out)
throws Exception {
Integer count = 0;
Double temp = 0.0;
if (modelState.contains(value.f0)) {
// there is already a value on the state
count = modelState.get(value.f0).f0 + 1;
temp = modelState.get(value.f0).f1 + value.f1.getTemp();
modelState.put(value.f0, Tuple2.of(1, value.f1.getTemp()));
} else {
// there is no value on the state
count = 1;
temp = value.f1.getTemp();
}
modelState.put(value.f0, Tuple2.of(count, temp));
if (count >= threshold) {
// only compute the average after the threshold
out.collect(Tuple2.of(value.f0, temp / count));
// clear the modelState value in order to compute new values next time
modelState.put(value.f0, Tuple2.of(0, 0.0));
}
}
}
}
为了定义 MapStateDescriptor
您可以执行以下操作:
MapStateDescriptor<String, Tuple2<Integer, Double>> modelState = new MapStateDescriptor<>(
"modelState",
BasicTypeInfo.STRING_TYPE_INFO,
TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Double.class));
this.modelState = getRuntimeContext().getMapState(modelState);
然而,在您的情况下实际上没有必要使用 MapState
。由于流已经被键入,所以使用 ValueState
就足够了。代码将如下所示:
public static class AverageTempMapper extends RichFlatMapFunction<Tuple2<String, MqttTemperature>, Tuple2<String, Double>> {
private static final long serialVersionUID = -4780146677198295204L;
private ValueState<Tuple2<Integer, Double>> modelState;
@Override
public void open(Configuration parameters) {
this.modelState = getRuntimeContext().getState(new ValueStateDescriptor<>("modelState", TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Double.class)));
}
@Override
public void flatMap(Tuple2<String, MqttTemperature> value, Collector<Tuple2<String, Double>> out) throws Exception {
Double temp;
Integer count;
if (modelState.value() != null) {
Tuple2<Integer, Double> state = modelState.value();
count = state.f0 + 1;
temp = state.f1 + value.f1.getTemp();
} else {
count = 1;
temp = value.f1.getTemp();
}
modelState.update(Tuple2.of(count, temp));
if (count >= 3) {
out.collect(Tuple2.of(value.f0, temp/count));
}
}
}