如何在 siddhi CEP 中放置到达和摄取时间戳
How to put an arrival and ingestion timestamp in siddhi CEP
我想在流到达 WSO2 IoT 服务器时立即放置到达时间戳,并在 CEP 引擎使用时放置摄取时间戳。这些时间将用于计算 Queuing latency
和 CEP latency
,如下所示。
Queuing latency = ingestion time - arrival time
CEP latency = detection time - ingestion time
下面是我的执行计划
@Plan:name('Server_CEP')
@Plan:statistics('true')
@Plan:trace('true')
@plan:async(bufferSize='1024')
@Import('stream2_scep:1.0.0')
define stream eeg_stream (meta_sensorID_s2 int, meta_tupleID_s2 int, value_s2 int, generationTime_s2 long);
@Import('stream1_scep:1.0.0')
define stream ecg_stream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long);
@Export('cep_stream_scep:1.0.0')
define stream CEPStream (cep_event int, cepLatency long);
from every ecg = ecg_stream[value_s1 >= 50 ] -> eeg = eeg_stream[value_s2 >= 50] within 10 sec
select ecg.value_s1 as cep_event , convert(time:currentTimestamp(), 'long') - ecg.generationTime_s1 as cepLatency
insert into CEPStream;
我能够找到检测时间为检测到CEP事件时的当前时间。我还使用缓冲区大小为 1024 的 @async
。现在的问题是如何在流到达后立即为流的到达时间添加时间戳。另外,第二个问题是如何放置引擎摄取时间戳。
有人能告诉我如何实现吗?
PS:我能够实现这是一个 Android 设备,因为我使用了非阻塞队列,到达时间是它到达 FIFO 队列的时间,摄取时间是出队时间
建议不要使用 @plan:async(bufferSize='1024')
,因为它将适用于与 Siddhi 应用关联的所有流。因此,仅将 async(buffer.size = '1024')
应用于您想要异步的流。
例如
async(buffer.size = '1024')
define stream <stream name> (...);
现在实现你所要求的。发送使初始流非异步(同步),在查询中使用该流并将当前时间戳注入该事件,它们将结果发送到另一个配置为异步模式的流,最后使用第二个流进行休息的处理。这样,您还可以以同步方式将到达时间添加到事件中。
我通过创建一个 execution plan
来做到这一点,它接收一个流 x
并在其上放置一个时间戳并将其发送到另一个流 y
。这种执行计划的示例代码是
@Plan:name('scep_s1_arrival_timestamping')
@Plan:statistics('false')
@Plan:trace('false')
@Import('stream1_scep:1.0.0')
define stream inputStream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long);
@Export('stream1_scep:2.0.0')
define stream outputStream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long, arrivalTime_s1 long);
from inputStream
select meta_sensorID_s1 as meta_sensorID_s1 , meta_tupleID_s1 as meta_tupleID_s1, value_s1 as value_s1, generationTime_s1 as generationTime_s1, convert(time:timestampInMilliseconds (),'long') as arrivalTime_s1
insert into outputStream;
时间戳是使用 convert(time:timestampInMilliseconds (),'long') as arrivalTime_s1
完成的。请注意,covert 用于将数据类型转换为 long,然后将其插入到 arrivalTime_s1
变量中。
我想在流到达 WSO2 IoT 服务器时立即放置到达时间戳,并在 CEP 引擎使用时放置摄取时间戳。这些时间将用于计算 Queuing latency
和 CEP latency
,如下所示。
Queuing latency = ingestion time - arrival time
CEP latency = detection time - ingestion time
下面是我的执行计划
@Plan:name('Server_CEP')
@Plan:statistics('true')
@Plan:trace('true')
@plan:async(bufferSize='1024')
@Import('stream2_scep:1.0.0')
define stream eeg_stream (meta_sensorID_s2 int, meta_tupleID_s2 int, value_s2 int, generationTime_s2 long);
@Import('stream1_scep:1.0.0')
define stream ecg_stream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long);
@Export('cep_stream_scep:1.0.0')
define stream CEPStream (cep_event int, cepLatency long);
from every ecg = ecg_stream[value_s1 >= 50 ] -> eeg = eeg_stream[value_s2 >= 50] within 10 sec
select ecg.value_s1 as cep_event , convert(time:currentTimestamp(), 'long') - ecg.generationTime_s1 as cepLatency
insert into CEPStream;
我能够找到检测时间为检测到CEP事件时的当前时间。我还使用缓冲区大小为 1024 的 @async
。现在的问题是如何在流到达后立即为流的到达时间添加时间戳。另外,第二个问题是如何放置引擎摄取时间戳。
有人能告诉我如何实现吗?
PS:我能够实现这是一个 Android 设备,因为我使用了非阻塞队列,到达时间是它到达 FIFO 队列的时间,摄取时间是出队时间
建议不要使用 @plan:async(bufferSize='1024')
,因为它将适用于与 Siddhi 应用关联的所有流。因此,仅将 async(buffer.size = '1024')
应用于您想要异步的流。
例如
async(buffer.size = '1024')
define stream <stream name> (...);
现在实现你所要求的。发送使初始流非异步(同步),在查询中使用该流并将当前时间戳注入该事件,它们将结果发送到另一个配置为异步模式的流,最后使用第二个流进行休息的处理。这样,您还可以以同步方式将到达时间添加到事件中。
我通过创建一个 execution plan
来做到这一点,它接收一个流 x
并在其上放置一个时间戳并将其发送到另一个流 y
。这种执行计划的示例代码是
@Plan:name('scep_s1_arrival_timestamping')
@Plan:statistics('false')
@Plan:trace('false')
@Import('stream1_scep:1.0.0')
define stream inputStream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long);
@Export('stream1_scep:2.0.0')
define stream outputStream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long, arrivalTime_s1 long);
from inputStream
select meta_sensorID_s1 as meta_sensorID_s1 , meta_tupleID_s1 as meta_tupleID_s1, value_s1 as value_s1, generationTime_s1 as generationTime_s1, convert(time:timestampInMilliseconds (),'long') as arrivalTime_s1
insert into outputStream;
时间戳是使用 convert(time:timestampInMilliseconds (),'long') as arrivalTime_s1
完成的。请注意,covert 用于将数据类型转换为 long,然后将其插入到 arrivalTime_s1
变量中。