KSQL查询检查值的完整性

KSQL Query for check value completeness

我有一个包含以下示例值的流,

correlation_id 和 event_type

例子
aud-103 触发器
aud-104 触发器
aud-109 缓解
aud-103 缓解

如果检测到相同 correlation_id 且最新 event_type 的事件正在缓解并且之前已经触发,则将状态设置为已缓解,否则未缓解;
换句话说,只有当事件具有触发和缓解作为事件类型\

时,事件才被认为是缓解的

我需要构建一个 table 以按列的最新值聚合

我设法使用以下(非常)肮脏的查询实现了这一点 \

CREATE TABLE SIEM_PARSE_EVENT_TBL WITH (KAFKA_TOPIC='SIEM_PARSE_EVENT_TBL', PARTITIONS=1, REPLICAS=1) AS SELECT
  SIEM_PARSE_EVENT.CORRELATION_ID CORRELATION_ID,
  CASE
   WHEN 
   (LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger') 
   OR
   (LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation')
   THEN 'mitigated'
   ELSE 'unmitigated'
  END AS MITIGATED_STATUS,
  COUNT(*) TOTAL
FROM SIEM_PARSE_EVENT SIEM_PARSE_EVENT
GROUP BY SIEM_PARSE_EVENT.CORRELATION_ID
EMIT CHANGES;

请问有什么办法可以做到更清洁吗? \

更新
我使用以下查询进行了管理,

SELECT  CORRELATIONID , collect_list( EVENTMITIGATIONTYPE ) from SIEM_PARSE_EVENT group by correlationid 
 HAVING ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'trigger' )
  AND ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'mitigation' )
EMIT CHANGES;

我有点喜欢你的解决方案;它是可读的。

作为一些替代方案,您可以使用一系列函数,如 collect_listarray_intersectarray_length 来收集 event_type,并检查两者是否mitigationtrigger 存在...

如果您正在管理自己的 ksqlDB,编写自定义 UDF 或 UDAF 来提供帮助将是另一种选择。