如何在 siddhi CEP 中放置到达和摄取时间戳

How to put an arrival and ingestion timestamp in siddhi CEP

我想在流到达 WSO2 IoT 服务器时立即放置到达时间戳,并在 CEP 引擎使用时放置摄取时间戳。这些时间将用于计算 Queuing latencyCEP latency,如下所示。

Queuing latency = ingestion time - arrival time
CEP latency = detection time - ingestion time

下面是我的执行计划

@Plan:name('Server_CEP')

@Plan:statistics('true')

@Plan:trace('true')

@plan:async(bufferSize='1024')


@Import('stream2_scep:1.0.0')
define stream eeg_stream (meta_sensorID_s2 int, meta_tupleID_s2 int, value_s2 int, generationTime_s2 long);

@Import('stream1_scep:1.0.0')
define stream ecg_stream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long);

@Export('cep_stream_scep:1.0.0')
define stream CEPStream (cep_event int, cepLatency long);


from every ecg = ecg_stream[value_s1 >= 50 ] ->  eeg = eeg_stream[value_s2 >= 50] within 10 sec

 select  ecg.value_s1 as  cep_event ,  convert(time:currentTimestamp(), 'long')  - ecg.generationTime_s1  as cepLatency

 insert into CEPStream;

我能够找到检测时间为检测到CEP事件时的当前时间。我还使用缓冲区大小为 1024 的 @async。现在的问题是如何在流到达后立即为流的到达时间添加时间戳。另外,第二个问题是如何放置引擎摄取时间戳。

有人能告诉我如何实现吗?

PS:我能够实现这是一个 Android 设备,因为我使用了非阻塞队列,到达时间是它到达 FIFO 队列的时间,摄取时间是出队时间

建议不要使用 @plan:async(bufferSize='1024'),因为它将适用于与 Siddhi 应用关联的所有流。因此,仅将 async(buffer.size = '1024') 应用于您想要异步的流。

例如

async(buffer.size = '1024')
define stream <stream name> (...);

现在实现你所要求的。发送使初始流非异步(同步),在查询中使用该流并将当前时间戳注入该事件,它们将结果发送到另一个配置为异步模式的流,最后使用第二个流进行休息的处理。这样,您还可以以同步方式将到达时间添加到事件中。

我通过创建一个 execution plan 来做到这一点,它接收一个流 x 并在其上放置一个时间戳并将其发送到另一个流 y。这种执行计划的示例代码是

@Plan:name('scep_s1_arrival_timestamping')

@Plan:statistics('false')

@Plan:trace('false')

@Import('stream1_scep:1.0.0')
define stream inputStream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long);

@Export('stream1_scep:2.0.0')
define stream outputStream (meta_sensorID_s1 int, meta_tupleID_s1 int, value_s1 int, generationTime_s1 long, arrivalTime_s1 long);

from  inputStream
select meta_sensorID_s1 as meta_sensorID_s1 , meta_tupleID_s1 as meta_tupleID_s1,  value_s1 as value_s1,  generationTime_s1 as generationTime_s1, convert(time:timestampInMilliseconds (),'long') as arrivalTime_s1
insert into outputStream;

时间戳是使用 convert(time:timestampInMilliseconds (),'long') as arrivalTime_s1 完成的。请注意,covert 用于将数据类型转换为 long,然后将其插入到 arrivalTime_s1 变量中。