在 Apache Edgent 中加入不同类型的流
Joining streams of different types in Apache Edgent
我有 3 个流:
TStream<Double> tempReadings=topology.poll(tempSensor, 10, TimeUnit.SECONDS);
TStream<Double> co2Readings=topology.poll(co2Sensor, 10, TimeUnit.SECONDS);
TStream<Boolean> stationaryReadings=topology.poll(stationarySensor, 10, TimeUnit.SECONDS);
我目前从 3 个 JSON 对象创建了 3 个独立的设备事件:
TStream<JsonObject> tempJson=tempReadings.map(tuple->{
JsonObject json=new JsonObject();
json.addProperty("Temperature", tuple);
return json;
});
TStream<JsonObject> co2Json=co2Readings.map(tuple->{
JsonObject json=new JsonObject();
json.addProperty("C02Level", tuple);
return json;
});
TStream<JsonObject> sensoryJson=stationaryReadings.map(tuple->{
JsonObject json=new JsonObject();
json.addProperty("isStationary", tuple);
return json;
});
相反,我想通过将这些流连接在一起并创建 1 个具有三个属性(温度、C02Level 和 isStationary)的 JSON 对象来创建单个事件。
您可以 union 流,但这只会一个接一个地放置元组,并且您需要使用相同类型的流。
如果您想一次读取所有 3 个属性,您可以创建一个传感器 returns 一个 "readings" 对象:
class Reading {
Double temperature;
Double c02Level;
Boolean isStationary;
}
在这种情况下,"single poll combined Reading tuple" 方法可能是最好的。
更一般地,PlumbingStreams.barrier()可用于合并多个流的相应元组。类似于:
TStream<JsonObject> combinedReadings =
PlumbingStreams.barrier(Arrays.asList(tempJson,co2Json,sensoryJson))
.map(list -> combineTuples(list));
static JsonObject combineTuples(JsonObject list...) {
JsonObject jo = new JsonObject();
for (JsonObject j : list) {
for (Entry<String,JsonElement> e : j.entrySet()) {
jo.addProperty(e.getKey(), e.getValue());
}
}
return jo;
}
我有 3 个流:
TStream<Double> tempReadings=topology.poll(tempSensor, 10, TimeUnit.SECONDS);
TStream<Double> co2Readings=topology.poll(co2Sensor, 10, TimeUnit.SECONDS);
TStream<Boolean> stationaryReadings=topology.poll(stationarySensor, 10, TimeUnit.SECONDS);
我目前从 3 个 JSON 对象创建了 3 个独立的设备事件:
TStream<JsonObject> tempJson=tempReadings.map(tuple->{
JsonObject json=new JsonObject();
json.addProperty("Temperature", tuple);
return json;
});
TStream<JsonObject> co2Json=co2Readings.map(tuple->{
JsonObject json=new JsonObject();
json.addProperty("C02Level", tuple);
return json;
});
TStream<JsonObject> sensoryJson=stationaryReadings.map(tuple->{
JsonObject json=new JsonObject();
json.addProperty("isStationary", tuple);
return json;
});
相反,我想通过将这些流连接在一起并创建 1 个具有三个属性(温度、C02Level 和 isStationary)的 JSON 对象来创建单个事件。
您可以 union 流,但这只会一个接一个地放置元组,并且您需要使用相同类型的流。
如果您想一次读取所有 3 个属性,您可以创建一个传感器 returns 一个 "readings" 对象:
class Reading {
Double temperature;
Double c02Level;
Boolean isStationary;
}
在这种情况下,"single poll combined Reading tuple" 方法可能是最好的。
更一般地,PlumbingStreams.barrier()可用于合并多个流的相应元组。类似于:
TStream<JsonObject> combinedReadings =
PlumbingStreams.barrier(Arrays.asList(tempJson,co2Json,sensoryJson))
.map(list -> combineTuples(list));
static JsonObject combineTuples(JsonObject list...) {
JsonObject jo = new JsonObject();
for (JsonObject j : list) {
for (Entry<String,JsonElement> e : j.entrySet()) {
jo.addProperty(e.getKey(), e.getValue());
}
}
return jo;
}