KSQL查询检查值的完整性
KSQL Query for check value completeness
我有一个包含以下示例值的流,
correlation_id 和 event_type
例子
aud-103 触发器
aud-104 触发器
aud-109 缓解
aud-103 缓解
如果检测到相同 correlation_id 且最新 event_type 的事件正在缓解并且之前已经触发,则将状态设置为已缓解,否则未缓解;
换句话说,只有当事件具有触发和缓解作为事件类型\
时,事件才被认为是缓解的
我需要构建一个 table 以按列的最新值聚合
我设法使用以下(非常)肮脏的查询实现了这一点 \
CREATE TABLE SIEM_PARSE_EVENT_TBL WITH (KAFKA_TOPIC='SIEM_PARSE_EVENT_TBL', PARTITIONS=1, REPLICAS=1) AS SELECT
SIEM_PARSE_EVENT.CORRELATION_ID CORRELATION_ID,
CASE
WHEN
(LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger')
OR
(LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation')
THEN 'mitigated'
ELSE 'unmitigated'
END AS MITIGATED_STATUS,
COUNT(*) TOTAL
FROM SIEM_PARSE_EVENT SIEM_PARSE_EVENT
GROUP BY SIEM_PARSE_EVENT.CORRELATION_ID
EMIT CHANGES;
请问有什么办法可以做到更清洁吗? \
更新
我使用以下查询进行了管理,
SELECT CORRELATIONID , collect_list( EVENTMITIGATIONTYPE ) from SIEM_PARSE_EVENT group by correlationid
HAVING ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'trigger' )
AND ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'mitigation' )
EMIT CHANGES;
我有点喜欢你的解决方案;它是可读的。
作为一些替代方案,您可以使用一系列函数,如 collect_list
、array_intersect
和 array_length
来收集 event_type
,并检查两者是否mitigation
和 trigger
存在...
如果您正在管理自己的 ksqlDB,编写自定义 UDF 或 UDAF 来提供帮助将是另一种选择。
我有一个包含以下示例值的流,
correlation_id 和 event_type
例子
aud-103 触发器
aud-104 触发器
aud-109 缓解
aud-103 缓解
如果检测到相同 correlation_id 且最新 event_type 的事件正在缓解并且之前已经触发,则将状态设置为已缓解,否则未缓解;
换句话说,只有当事件具有触发和缓解作为事件类型\
我需要构建一个 table 以按列的最新值聚合
我设法使用以下(非常)肮脏的查询实现了这一点 \
CREATE TABLE SIEM_PARSE_EVENT_TBL WITH (KAFKA_TOPIC='SIEM_PARSE_EVENT_TBL', PARTITIONS=1, REPLICAS=1) AS SELECT
SIEM_PARSE_EVENT.CORRELATION_ID CORRELATION_ID,
CASE
WHEN
(LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger')
OR
(LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation')
THEN 'mitigated'
ELSE 'unmitigated'
END AS MITIGATED_STATUS,
COUNT(*) TOTAL
FROM SIEM_PARSE_EVENT SIEM_PARSE_EVENT
GROUP BY SIEM_PARSE_EVENT.CORRELATION_ID
EMIT CHANGES;
请问有什么办法可以做到更清洁吗? \
更新
我使用以下查询进行了管理,
SELECT CORRELATIONID , collect_list( EVENTMITIGATIONTYPE ) from SIEM_PARSE_EVENT group by correlationid
HAVING ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'trigger' )
AND ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'mitigation' )
EMIT CHANGES;
我有点喜欢你的解决方案;它是可读的。
作为一些替代方案,您可以使用一系列函数,如 collect_list
、array_intersect
和 array_length
来收集 event_type
,并检查两者是否mitigation
和 trigger
存在...
如果您正在管理自己的 ksqlDB,编写自定义 UDF 或 UDAF 来提供帮助将是另一种选择。