使用 Faust 滑动 window

Sliding window using Faust

有谁知道如何使用 Faust 实现滑动 window?

我们的想法是在 10 秒、30 秒、60 秒和 300 秒内计算一个键的出现次数 window,但我们需要在 1 秒或每次更新时计算一次。

我有一个狡猾的解决方法,它看起来非常低效,我有一个 300 秒到期的 1s window,然后我将 table 中的所有旧值加到当前值一种使用 delta() 方法。它似乎可以处理来自 6 个源的消息,每个源 运行 在 10 messages/s,但这是我们看到延迟之前的极限。这显然是一种无法扩展的缓慢方法,所以问题是如何在不需要 KSQL 或设置 Spark 集群的情况下实现这一点,以及 Kafka cluster.We 试图保持这种简单,如果我们可以。

更复杂的是,我们非常希望在过去 24 小时、1 周、1 个月和过去 3 个月内拥有相同的统计数据……所有这些都是动态的。但也许我们只是要求太多而没有针对每个输入的专用过程。

这是我的狡猾代码:

class AlarmCount(faust.Record, serializer='json'):
  event_id: int
  source_id: int
  counts_10: int
  counts_30: int
  counts_60: int
  counts_300: int

@app.agent(events_topic)
async def new_event(stream):
  async for value in stream:
    # calculate the count statistics
    counts_10=0
    counts_30=0
    counts_60=0
    counts_300=0
    
    event_counts_table[value.global_id] += 1
    
    for i in range(300):
      if(i<=10):
        counts_10+=event_counts_table[value.source_id].delta(i)
      if(i<=30):
        counts_30+=event_counts_table[value.source_id].delta(i)
      if(i<=60):
        counts_60+=event_counts_table[value.source_id].delta(i)
      if(i<=300):
        counts_300+=event_counts_table[value.source_id].delta(i)
    
    await event_counts_topic.send(
      value=EventCount(
        event_id=value.event_id,
        source_id=value.source_id,
        counts_10=counts_10,
        counts_30=counts_30,
        counts_60=counts_60,
        counts_300=counts_300
      )
    )

我想遍历所有 windows 以将最后一个值与所有其他过去值的 mean/deviation/other-aggregation 进行比较。

  • 类似于table[key].iter_windows()
  • 并且没有遍历所有 .delta(i)

像您一样,我将实现一个带有时间戳列表的 table。如果列表太大,它将不是最优的,因为 changelog 会很胖。我们应该只流式传输修改的内容,而不是重复每个事件的所有列表。

所以我将创建一个包含详细信息的短期列表和一个包含聚合的长期列表。然后,每次事件只会更新短期列表。

所以好像没有什么好的办法。

我找到的最佳解决方案是将每个 id 的时间戳列表存储在 table 中并将时间戳附加到新事件上,然后删除过期的时间戳,然后 return 长度为新值排在另一个主题之上。

唯一真正的问题是它只捕获事件的每个时间范围内的真实事件计数——理想的情况是每秒实时更新每个时间范围的计数。但我不认为这就是这个系统 is/should/can 的用途 - 它用于事件处理,所以它需要一个事件。我们可以使用计时器功能每秒触发一次重新计数,但这会显着增加处理速度和吞吐量,并且由于警报的触发是针对新事件,所以这无关紧要。有就好,不是必须的。

对于长期统计数据(我们在这里谈论数周和数月),我们决定将所有事件写入数据库,然后定期(每 10 秒)执行类似的任务来查找新事件和过期事件,然后将聚合计数发送到 Kafka 主题以进行额外处理。即使每秒处理 >1000 个事件,每 10 秒处理一次数据也只需要 10-20 毫秒左右,因此这是可管理的,因为 1000/s 是噩梦般的场景,它只会在蓝色月亮中发生一次,然后停止。