Esper 多个属性上的不同事件

Esper distinct events on multiple attributes

我对 Esper 中的流语义有疑问。我的目标是只输出具有成对不同属性的事件。此外,还有必须在属性之间保持的时间条件(参见 Espers Interval Algebra Reference)。

示例语句:

insert into output_stream select a.*, b.*
from stream1#length(100) as a, stream2#length(100) as b
where a.before(b) or a.meets(b) or a.overlaps(b)

成对不同的属性意味着,我想确保没有两个输出 o1, o2 where o1.a = o2.a or o1.b = o2.b.举个更具体的例子,如果有结果

o1: (a = a1, b = b1),
o2: (a = a1, b = b2),
o3: (a = a2, b = b2),
o4: (a = a2, b = b1)

只输出其中的两个(例如o1和o3或o2和o4)。哪个暂时无所谓

我想用这样的 NOT EXISTS 子句完成成对不同的属性:

NOT EXISTS ( 
    select * from output_stream#length(100) as otherOutput 
    where a = otherOutput.a or b = otherOutput.b )

部分有效,对于连续输出,断言 o1.a = o2.a or o1.b = o2.b 始终成立。

然而,当stream1首先传递多个"a"然后stream2传递一个"b"时,这符合要与两个[=62=连接的条件]s,一次有多个输出。这不在我的 NOT EXISTS 子句中,因为在同一步骤中会出现具有相同 "b" 的多个输出,因此它们还不在 output_stream.

distinct 关键字在这里不合适,因为它会同时检查所有属性,而不是成对检查。同样,对所有属性使用简单的 group by 也是不合适的。我很想有像 "distinct on a and distinct on b" 这样的标准,但它不存在。

我可以用嵌套 group by 解决这个问题,我在每个属性上分组

select first(*) from (select first(*) from output_stream group by a) group by b

但根据一条评论,在流处理系统中没有明确定义的语义。因此,Esper 不允许在查询的 from 部分进行子查询。

我需要的是一种强制一次只输出一个输出的方法,从而在每个进一步的输出上重新检查 NOT EXISTS 条件,或者以某种方式检查同时发生的输出与另一个输出, 在实际将它们插入流之前。

更新: 输出的时间不是很关键。 output_stream 将被其他此类语句使用,因此我可以通过增加 windows 的长度来解决延迟问题。 stream1stream2 按照 startTimestamp 属性.

的顺序传递事件
create schema Pair(a string, b string);
create window PairWindow#length(100) as Pair;
insert into PairWindow select * from Pair;
on PairWindow as arriving select * from PairWindow as other  
  where arriving.a = other.a or arriving.b = other.b

这是一个使用命名 window 保留最后 100 对的自连接示例。

编辑:以上查询是为我对原始要求的理解而设计的。下面的查询是为新的说明而设计的。它检查 "a" 或 "b" 是否有任何先前的值(在最近的 100 个事件中,根据需要将 #length(100) 关闭)

create schema Pair(a string, b string);
create window PairUniqueByA#firstunique(a)#length(100) as Pair;
create window PairUniqueByB#firstunique(b)#length(100) as Pair;

insert into PairUniqueByA select * from Pair;
insert into PairUniqueByB select * from Pair;

select * from Pair as pair
  where not exists (select a from PairUniqueByA as uba where uba.a = pair.a)
  and not exists (select a from PairUniqueByB as ubb where ubb.b = pair.b);