Apache Flink 用户定义警报的最佳实践

Best practice for Apache Flink for user defined alerts

假设我的 Flink 作业接收到一系列股票价格(作为示例)并在股票跌破特定价格时发出警报。用户可以添加或删除这些警报条件。例如,用户 abc@somemail.com 创建了一个规则,当 GME 的价格跌至 100 美元以下时会收到警报。我的 Flink 作业如何以可扩展的方式动态跟踪所有这些警报条件?

我可以创建一个 API,我的 Flink 作业可以调用它来获取所有更新的警报标准,但这意味着要多次调用 API 以使所有内容保持最新状态。

或者我可以使用 Flink Table API 创建一个永久的 table,一旦用户创建新的警报条件,另一个 Flink 作业就会更新。

这个用例的最佳实践是什么?

备注:

  1. 警报应该以最短的延迟发出
  2. 警报标准应在用户创建后立即更新。

我认为这取决于您通常如何生成警报。我的第一个想法是使用 Kafka 来存储新的警报,以便 Flink 可以将它们作为 Stream 接收。然后,根据要求,您可以简单地 broadcast 警报流和 connect 股票价格流。这应该可以让您很好地扩展。

但是如果你使用 Table API,那么使用外部 Table 来存储数据可能也是一个好主意,那么你可以看看类似的东西that.

这是纯流式方法的设计草图:

alertUpdates = alerts
  .keyBy(user)
  .process(managePreviousAlerts)  // uses MapState<Stock, Price>
  .keyBy(stock, price)

priceUpdates = prices
  .keyBy(stock)
  .process(managePriceHistory)
  .keyBy(stock, price)

alertUpdates
  .connect(priceUpdates)
  .process(manageAlertsAndPrices) // uses MapState<User, Boolean>
  • managePreviousAlerts 维护每个用户 MapState 从股票到警报价格。当新警报到达时,找到该股票(对于该用户)的现有警报(如果有)。然后发出两个 AlertUpdates:一个 RemoveAlert 事件(user, stock, oldAlertPrice)和一个 AddAlert 事件(user, stock, newAlertPrice)。
  • managePriceHistory 在状态中保留一些每只股票的定价历史记录,并使用一些业务逻辑来决定传入的价格是否是值得触发警报的变化。 (例如,也许您只在价格下跌时发出警报。)
  • manageAlertsAndPrices 维护每个股票、每个价格 MapState,由用户键入。
    • 这个 MapState 的键是所有以这个价格提醒这只股票的用户。收到 PriceUpdate 后,通过遍历 MapState.
    • 的键来提醒所有这些用户
    • 收到 RemoveAlert 后,将用户从 MapState 中删除。
    • 收到 AddAlert 后,将用户添加到 MapState

这应该可以很好地扩展。延迟将由 keyBys 引起的两次网络洗牌控制。