为什么我在 Flink 中的 MapState 变量不保留以前的值?
Why my MapState variable in Flink is not persisting previous values?
我正在 Java 中实现 Flink 程序以使用 MapStateDescriptor
处理状态。我的实现基于此 source。出于某种原因,MapState
保留了以前的值,我无法计算出我想要的平均值。当我调试时 averageTemp
总是空的,我从来没有在里面找到任何钥匙。我在实施过程中遗漏了什么?
import java.util.Map;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.sense.flink.mqtt.MqttTemperature;
import org.sense.flink.mqtt.TemperatureMqttConsumer;
public class SensorsMultipleReadingMqttEdgentQEP {
public SensorsMultipleReadingMqttEdgentQEP() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<MqttTemperature> temperatureStream01 = env.addSource(new TemperatureMqttConsumer("topic-edgent-01"));
DataStream<MqttTemperature> temperatureStream02 = env.addSource(new TemperatureMqttConsumer("topic-edgent-02"));
DataStream<MqttTemperature> temperatureStream03 = env.addSource(new TemperatureMqttConsumer("topic-edgent-03"));
DataStream<MqttTemperature> temperatureStreams = temperatureStream01.union(temperatureStream02)
.union(temperatureStream03);
DataStream<Tuple2<String, Double>> average = temperatureStreams.keyBy(new TemperatureKeySelector())
.map(new AverageTempMapper());
average.print();
env.execute("SensorsMultipleReadingMqttEdgentQEP");
}
public static class TemperatureKeySelector implements KeySelector<MqttTemperature, Integer> {
private static final long serialVersionUID = 5905504239899133953L;
@Override
public Integer getKey(MqttTemperature value) throws Exception {
return value.getId();
}
}
public static class AverageTempMapper extends RichMapFunction<MqttTemperature, Tuple2<String, Double>> {
private static final long serialVersionUID = -5489672634096634902L;
private MapState<String, Double> averageTemp;
@Override
public void open(Configuration parameters) throws Exception {
averageTemp = getRuntimeContext()
.getMapState(new MapStateDescriptor<>("average-temperature", String.class, Double.class));
}
@Override
public Tuple2<String, Double> map(MqttTemperature value) throws Exception {
String key = "no-room";
Double temp = value.getTemp();
if (value.getId().equals(1) || value.getId().equals(2) || value.getId().equals(3)) {
key = "room-A";
} else if (value.getId().equals(4) || value.getId().equals(5) || value.getId().equals(6)) {
key = "room-B";
} else if (value.getId().equals(7) || value.getId().equals(8) || value.getId().equals(9)) {
key = "room-C";
}
// NEVER ITERATES ON THE averageTemp
for (Map.Entry<String, Double> entry: averageTemp.entries()) {
System.out.println(entry.getKey() + " - " + entry.getValue());
}
System.out.println("value: " + value);
if (averageTemp.contains(key)) { // NEVER CONTAINS A KEY
System.out.println("yes: " + key);
temp = (averageTemp.get(key) + value.getTemp()) / 2;
} else {
averageTemp.put(key, temp);
}
return Tuple2.of(key, temp);
}
}
}
**编辑:**好的。我误解了这个问题。代码将以前的状态保存在 MapState 上。我错了,因为我正在调试代码。但实际上我遇到的问题是它启动了 1 个以上的线程,并且在开始计算平均值之前至少覆盖了我的地图值 3 次。
您的地图函数中的状态基于每个键。因此,当调用 map 函数并获得 map 状态时,它将针对正在处理的 MqttTemperature
记录中的任何 id。
鉴于您需要每个房间的平均温度,我将按如下方式处理:
- 根据 id 字段将
TemperatureKeySelector
更改为 return room-A
、room-B
或 room-C
。
- 在
AverageTempMapper
中,有两个 ValueState
变量 - 一个是温度总和 (Double),另一个是计数。当你的map()
方法被调用时,如果这两个ValueState
变量中有一个为null,则将其初始化为0,然后sum/increment.
我正在 Java 中实现 Flink 程序以使用 MapStateDescriptor
处理状态。我的实现基于此 source。出于某种原因,MapState
保留了以前的值,我无法计算出我想要的平均值。当我调试时 averageTemp
总是空的,我从来没有在里面找到任何钥匙。我在实施过程中遗漏了什么?
import java.util.Map;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.sense.flink.mqtt.MqttTemperature;
import org.sense.flink.mqtt.TemperatureMqttConsumer;
public class SensorsMultipleReadingMqttEdgentQEP {
public SensorsMultipleReadingMqttEdgentQEP() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<MqttTemperature> temperatureStream01 = env.addSource(new TemperatureMqttConsumer("topic-edgent-01"));
DataStream<MqttTemperature> temperatureStream02 = env.addSource(new TemperatureMqttConsumer("topic-edgent-02"));
DataStream<MqttTemperature> temperatureStream03 = env.addSource(new TemperatureMqttConsumer("topic-edgent-03"));
DataStream<MqttTemperature> temperatureStreams = temperatureStream01.union(temperatureStream02)
.union(temperatureStream03);
DataStream<Tuple2<String, Double>> average = temperatureStreams.keyBy(new TemperatureKeySelector())
.map(new AverageTempMapper());
average.print();
env.execute("SensorsMultipleReadingMqttEdgentQEP");
}
public static class TemperatureKeySelector implements KeySelector<MqttTemperature, Integer> {
private static final long serialVersionUID = 5905504239899133953L;
@Override
public Integer getKey(MqttTemperature value) throws Exception {
return value.getId();
}
}
public static class AverageTempMapper extends RichMapFunction<MqttTemperature, Tuple2<String, Double>> {
private static final long serialVersionUID = -5489672634096634902L;
private MapState<String, Double> averageTemp;
@Override
public void open(Configuration parameters) throws Exception {
averageTemp = getRuntimeContext()
.getMapState(new MapStateDescriptor<>("average-temperature", String.class, Double.class));
}
@Override
public Tuple2<String, Double> map(MqttTemperature value) throws Exception {
String key = "no-room";
Double temp = value.getTemp();
if (value.getId().equals(1) || value.getId().equals(2) || value.getId().equals(3)) {
key = "room-A";
} else if (value.getId().equals(4) || value.getId().equals(5) || value.getId().equals(6)) {
key = "room-B";
} else if (value.getId().equals(7) || value.getId().equals(8) || value.getId().equals(9)) {
key = "room-C";
}
// NEVER ITERATES ON THE averageTemp
for (Map.Entry<String, Double> entry: averageTemp.entries()) {
System.out.println(entry.getKey() + " - " + entry.getValue());
}
System.out.println("value: " + value);
if (averageTemp.contains(key)) { // NEVER CONTAINS A KEY
System.out.println("yes: " + key);
temp = (averageTemp.get(key) + value.getTemp()) / 2;
} else {
averageTemp.put(key, temp);
}
return Tuple2.of(key, temp);
}
}
}
**编辑:**好的。我误解了这个问题。代码将以前的状态保存在 MapState 上。我错了,因为我正在调试代码。但实际上我遇到的问题是它启动了 1 个以上的线程,并且在开始计算平均值之前至少覆盖了我的地图值 3 次。
您的地图函数中的状态基于每个键。因此,当调用 map 函数并获得 map 状态时,它将针对正在处理的 MqttTemperature
记录中的任何 id。
鉴于您需要每个房间的平均温度,我将按如下方式处理:
- 根据 id 字段将
TemperatureKeySelector
更改为 returnroom-A
、room-B
或room-C
。 - 在
AverageTempMapper
中,有两个ValueState
变量 - 一个是温度总和 (Double),另一个是计数。当你的map()
方法被调用时,如果这两个ValueState
变量中有一个为null,则将其初始化为0,然后sum/increment.