使用 Kinesis Analytics 分析事件和相关的缺失事件,及时分离?
Use Kinesis Analytics for analyzing events and related missing events, separated in time?
我有各种设备的事件流,可以是 "connected" 或 "disconnected"。
即事件具有以下结构:
- 时间戳
- device_id
- 事件("connected" 或 "disconnected")
我想在设备断开连接且在(设备特定的可配置)时间段内未连接时立即触发操作,例如1小时。我只想在每个 "disconnected" 事件中触发一次。
这是否可以使用 AWS Kinesis Analytics 完成?如果可以,查询会是什么样子?如果没有,可以使用其他工具完成还是我必须自定义构建它?
这可以通过 Drools Kinesis Analytics(亚马逊上的托管服务)实现:
类型:
package com.test;
import java.util.Set;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTable;
declare DeviceConfig
@DynamoDBTable(tableName="DeviceConfig")
deviceId: int @DynamoDBHashKey(attributeName="device_id");
timeoutMillis: int @DynamoDBAttribute(attributeName="timeout_millis");
end
declare DeviceEvent
@role( event )
// attributes
deviceId: int;
timestamp: java.util.Date;
event: String;
end
declare DisconnectAlert
deviceId: int;
end
规则:
package com.test;
// setup dynamic timer
rule "disconnect timer"
timer( expr: $timeout )
when
$event : DeviceEvent( $id : deviceId, event == "disconnected" ) from entry-point events
DeviceConfig(deviceId == $event.deviceId, $timeout : timeoutMillis) from entry-point configs
then
insertLogical(new DisconnectAlert($event.getDeviceId()));
end
rule "remove dups"
when
$event : DeviceEvent( $id : deviceId, $state : event ) from entry-point events
$dup : DeviceEvent(this != $event, deviceId == $event.deviceId, event == $state, this after $event) from entry-point events
then
delete($dup);
end
// on connect event remove "disconnected" state
rule "connect device"
when
$disconnected : DeviceEvent( $id : deviceId, event == "disconnected" ) from entry-point events
DeviceEvent(deviceId == $disconnected.deviceId, event == "connected", this after $disconnected) from entry-point events
then
delete($disconnected);
end
// cleanup "connected" state to free up memory (not needed any more)
rule "delete connected state"
when
$connected : DeviceEvent(event == "connected") from entry-point events
then
delete($connected);
end
请注意,有两种类型的输入:
- DeviceConfig,主要是静态设备配置,位于 DynamoDB 中。
- DeviceEvent,这是设备事件的 Kinesis Stream。
我有各种设备的事件流,可以是 "connected" 或 "disconnected"。
即事件具有以下结构:
- 时间戳
- device_id
- 事件("connected" 或 "disconnected")
我想在设备断开连接且在(设备特定的可配置)时间段内未连接时立即触发操作,例如1小时。我只想在每个 "disconnected" 事件中触发一次。
这是否可以使用 AWS Kinesis Analytics 完成?如果可以,查询会是什么样子?如果没有,可以使用其他工具完成还是我必须自定义构建它?
这可以通过 Drools Kinesis Analytics(亚马逊上的托管服务)实现:
类型:
package com.test;
import java.util.Set;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTable;
declare DeviceConfig
@DynamoDBTable(tableName="DeviceConfig")
deviceId: int @DynamoDBHashKey(attributeName="device_id");
timeoutMillis: int @DynamoDBAttribute(attributeName="timeout_millis");
end
declare DeviceEvent
@role( event )
// attributes
deviceId: int;
timestamp: java.util.Date;
event: String;
end
declare DisconnectAlert
deviceId: int;
end
规则:
package com.test;
// setup dynamic timer
rule "disconnect timer"
timer( expr: $timeout )
when
$event : DeviceEvent( $id : deviceId, event == "disconnected" ) from entry-point events
DeviceConfig(deviceId == $event.deviceId, $timeout : timeoutMillis) from entry-point configs
then
insertLogical(new DisconnectAlert($event.getDeviceId()));
end
rule "remove dups"
when
$event : DeviceEvent( $id : deviceId, $state : event ) from entry-point events
$dup : DeviceEvent(this != $event, deviceId == $event.deviceId, event == $state, this after $event) from entry-point events
then
delete($dup);
end
// on connect event remove "disconnected" state
rule "connect device"
when
$disconnected : DeviceEvent( $id : deviceId, event == "disconnected" ) from entry-point events
DeviceEvent(deviceId == $disconnected.deviceId, event == "connected", this after $disconnected) from entry-point events
then
delete($disconnected);
end
// cleanup "connected" state to free up memory (not needed any more)
rule "delete connected state"
when
$connected : DeviceEvent(event == "connected") from entry-point events
then
delete($connected);
end
请注意,有两种类型的输入:
- DeviceConfig,主要是静态设备配置,位于 DynamoDB 中。
- DeviceEvent,这是设备事件的 Kinesis Stream。