复杂的事件传播多年
Complex events spread over years
我有一个场景,如果查询的一部分匹配一个事件,我想从数据存储中获取一些其他事件来测试查询的其余部分
例如。 “如果 JANE DOE 从我的商店购买商品,她在过去 3 年中是否购买过其他任何东西”之类的东西。
Flink、Storm或WSO2是否提供对这种复杂事件处理的支持?
Flink 可以做到这一点,但它需要你从你关心的最早的事件(例如 3 年前)开始处理所有事件,这样你就可以为每个客户构建状态。然后 Flink 允许您管理此状态(通常使用 RocksDB),这样您就不必在面对系统故障时重播所有事件。
如果您无法重放所有历史记录,那么通常您会将其放入具有您需要的可扩展性和性能特征的其他存储(Cassandra/HBase、Elasticsearch 等)中,然后使用Flink 的 async 函数支持在收到新事件时查询它。
WSO2 Stream processor let’s you implement such functionality with it’s time incremental analytics 特征。要实现您提到的场景,您可以将客户到达时触发的事件提供给称为“聚合”的结构。当您不断向聚合提供事件时,它会随着时间的推移汇总数据,并将保存在配置的持久性存储中,例如 DB。
您可以查询此聚合以获取给定时间段内的状态。例如,以下查询将获取 2014-2015 年的姓名、购买的商品总数和平均交易价值
from CustomerSummaryRetrievalStream as b join CustoemrAggregation as a
on a.name == b.name
within "2014-01-01 00:00:00 +05:30", "2015-01-01 00:00:00 +05:30"
per “years”
select a.name, a.total, a.avgTxValue
insert into CustomerSummaryStream;
我有一个场景,如果查询的一部分匹配一个事件,我想从数据存储中获取一些其他事件来测试查询的其余部分
例如。 “如果 JANE DOE 从我的商店购买商品,她在过去 3 年中是否购买过其他任何东西”之类的东西。
Flink、Storm或WSO2是否提供对这种复杂事件处理的支持?
Flink 可以做到这一点,但它需要你从你关心的最早的事件(例如 3 年前)开始处理所有事件,这样你就可以为每个客户构建状态。然后 Flink 允许您管理此状态(通常使用 RocksDB),这样您就不必在面对系统故障时重播所有事件。
如果您无法重放所有历史记录,那么通常您会将其放入具有您需要的可扩展性和性能特征的其他存储(Cassandra/HBase、Elasticsearch 等)中,然后使用Flink 的 async 函数支持在收到新事件时查询它。
WSO2 Stream processor let’s you implement such functionality with it’s time incremental analytics 特征。要实现您提到的场景,您可以将客户到达时触发的事件提供给称为“聚合”的结构。当您不断向聚合提供事件时,它会随着时间的推移汇总数据,并将保存在配置的持久性存储中,例如 DB。
您可以查询此聚合以获取给定时间段内的状态。例如,以下查询将获取 2014-2015 年的姓名、购买的商品总数和平均交易价值
from CustomerSummaryRetrievalStream as b join CustoemrAggregation as a
on a.name == b.name
within "2014-01-01 00:00:00 +05:30", "2015-01-01 00:00:00 +05:30"
per “years”
select a.name, a.total, a.avgTxValue
insert into CustomerSummaryStream;