在 Flink 或任何其他系统中合并两种不同类型的数据流
merging datastreams of two different types in Flink or any other system
我想将 Flink 用于远程患者监控案例场景,其中包括各种传感器,如陀螺仪、加速度计、ECG 流、心率流、RR 率等。因此在这种情况下,我们不可能拥有相同的数据类型或输入速率等,但我仍然想检测心律失常或其他涉及在这些多个传感器上进行 CEP 的医疗状况
我所知道的是,如果我想对这些传感器执行一些复杂的事件处理,那么我有 2 个选项需要在 CEP 之前完成
- Join diff streams
- merge diff streams
之前我是根据传感器的时间戳执行连接,但它不会连接所有事件,因为差异流可以具有不同的速率和以微秒为单位的不同时间戳,所以这种情况很少见时间戳完全相等。
所以我想选择选项 # 2,即在执行 CEP 之前执行合并。为此,我在 Flink 文档中发现,我可以 merge the two streams 但它们应该具有相同的数据类型,我尝试这样做但我没有成功,因为出现以下错误
Exception in thread "main" java.lang.IllegalArgumentException: Cannot union streams of different types: GenericType<org.carleton.cep.monitoring.latest.Events.RRIntervalStreamEvent> and GenericType<org.carleton.cep.monitoring.latest.Events.qrsIntervalStreamEvent>
at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:217)
现在让我们看看我是如何尝试执行合并的。所以基本上我有两个流类,它们的属性如下
RRIntervalStreamEvent 流
public Integer Sensor_id;
public Long time;
public Integer RRInterval;
qrsIntervalStreamEvent 流
public Integer Sensor_id;
public Long time;
public Integer qrsInterval;
这两个流都有生成器 类,它也以相同的数据类型在指定的位置发送事件 rate.Below 是我试图合并它们的代码。
// getting qrs interval stream
DataStream<qrsIntervalStreamEvent> qrs_stream_raw = envrionment.
addSource(new Qrs_interval_Gen(input_rate_qrs_S,Total_Number_Of_Events_in_qrs)).name("qrs stream");
// getting RR interval stream
DataStream<RRIntervalStreamEvent> rr_stream_raw = envrionment.
addSource(new RR_interval_Gen(input_rate_rr_S,Total_Number_Of_Events_in_RR)).name("RR stream");
//merging both streams
DataStream<Tuple3<Integer,Long,Integer>> mergedStream;
mergedStream = rr_stream_raw.union(new DataStream[]{qrs_stream_raw});
我必须使用 new DataStream[]
,因为仅使用 qrs_stream_raw
会导致错误,如下所示。
有人可以给我一个关于
的想法吗
- 我应该如何合并这两个流?
- 我应该如何合并两个以上的流?
- 是否有一些引擎可以合并两个以上具有不同结构的流,如果是,我应该使用哪个引擎
正如 Alex 所指出的,我们可以使用两个流的相同数据类型,并可以在 Flink 中加入它们,另一种选择是使用 Siddhi 或 Flink-Siddhi 扩展。但是我只想在 Flink 中做所有事情
所以我在我的程序中做了一些修改以使其工作
第 1 步: 将我的发电机 类 变为 return 普通类型
public class RR_interval_Gen extends RichParallelSourceFunction<Tuple3<Integer,Long, Integer>>
step# 2: 使两个流生成器都具有 Tuple 类型,然后合并 2 个流。
// getting qrs interval stream
DataStream<Tuple3<Integer,Long,Integer>> qrs_stream_raw = envrionment.
addSource(new Qrs_interval_Gen(input_rate_qrs_S,Total_Number_Of_Events_in_qrs)).name("qrs stream");
// getting RR interval stream
DataStream<Tuple3<Integer,Long,Integer>> rr_stream_raw = envrionment.
addSource(new RR_interval_Gen(input_rate_rr_S,Total_Number_Of_Events_in_RR)).name("RR stream");
//merging both streams
DataStream<Tuple3<Integer,Long,Integer>> mergedStream = rr_stream_raw.union(qrs_stream_raw);
我想将 Flink 用于远程患者监控案例场景,其中包括各种传感器,如陀螺仪、加速度计、ECG 流、心率流、RR 率等。因此在这种情况下,我们不可能拥有相同的数据类型或输入速率等,但我仍然想检测心律失常或其他涉及在这些多个传感器上进行 CEP 的医疗状况
我所知道的是,如果我想对这些传感器执行一些复杂的事件处理,那么我有 2 个选项需要在 CEP 之前完成
- Join diff streams
- merge diff streams
之前我是根据传感器的时间戳执行连接,但它不会连接所有事件,因为差异流可以具有不同的速率和以微秒为单位的不同时间戳,所以这种情况很少见时间戳完全相等。
所以我想选择选项 # 2,即在执行 CEP 之前执行合并。为此,我在 Flink 文档中发现,我可以 merge the two streams 但它们应该具有相同的数据类型,我尝试这样做但我没有成功,因为出现以下错误
Exception in thread "main" java.lang.IllegalArgumentException: Cannot union streams of different types: GenericType<org.carleton.cep.monitoring.latest.Events.RRIntervalStreamEvent> and GenericType<org.carleton.cep.monitoring.latest.Events.qrsIntervalStreamEvent>
at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:217)
现在让我们看看我是如何尝试执行合并的。所以基本上我有两个流类,它们的属性如下
RRIntervalStreamEvent 流
public Integer Sensor_id;
public Long time;
public Integer RRInterval;
qrsIntervalStreamEvent 流
public Integer Sensor_id;
public Long time;
public Integer qrsInterval;
这两个流都有生成器 类,它也以相同的数据类型在指定的位置发送事件 rate.Below 是我试图合并它们的代码。
// getting qrs interval stream
DataStream<qrsIntervalStreamEvent> qrs_stream_raw = envrionment.
addSource(new Qrs_interval_Gen(input_rate_qrs_S,Total_Number_Of_Events_in_qrs)).name("qrs stream");
// getting RR interval stream
DataStream<RRIntervalStreamEvent> rr_stream_raw = envrionment.
addSource(new RR_interval_Gen(input_rate_rr_S,Total_Number_Of_Events_in_RR)).name("RR stream");
//merging both streams
DataStream<Tuple3<Integer,Long,Integer>> mergedStream;
mergedStream = rr_stream_raw.union(new DataStream[]{qrs_stream_raw});
我必须使用 new DataStream[]
,因为仅使用 qrs_stream_raw
会导致错误,如下所示。
有人可以给我一个关于
的想法吗- 我应该如何合并这两个流?
- 我应该如何合并两个以上的流?
- 是否有一些引擎可以合并两个以上具有不同结构的流,如果是,我应该使用哪个引擎
正如 Alex 所指出的,我们可以使用两个流的相同数据类型,并可以在 Flink 中加入它们,另一种选择是使用 Siddhi 或 Flink-Siddhi 扩展。但是我只想在 Flink 中做所有事情
所以我在我的程序中做了一些修改以使其工作
第 1 步: 将我的发电机 类 变为 return 普通类型
public class RR_interval_Gen extends RichParallelSourceFunction<Tuple3<Integer,Long, Integer>>
step# 2: 使两个流生成器都具有 Tuple 类型,然后合并 2 个流。
// getting qrs interval stream
DataStream<Tuple3<Integer,Long,Integer>> qrs_stream_raw = envrionment.
addSource(new Qrs_interval_Gen(input_rate_qrs_S,Total_Number_Of_Events_in_qrs)).name("qrs stream");
// getting RR interval stream
DataStream<Tuple3<Integer,Long,Integer>> rr_stream_raw = envrionment.
addSource(new RR_interval_Gen(input_rate_rr_S,Total_Number_Of_Events_in_RR)).name("RR stream");
//merging both streams
DataStream<Tuple3<Integer,Long,Integer>> mergedStream = rr_stream_raw.union(qrs_stream_raw);