Apache Flink 用户定义警报的最佳实践
Best practice for Apache Flink for user defined alerts
假设我的 Flink 作业接收到一系列股票价格(作为示例)并在股票跌破特定价格时发出警报。用户可以添加或删除这些警报条件。例如,用户 abc@somemail.com
创建了一个规则,当 GME
的价格跌至 100 美元以下时会收到警报。我的 Flink 作业如何以可扩展的方式动态跟踪所有这些警报条件?
我可以创建一个 API,我的 Flink 作业可以调用它来获取所有更新的警报标准,但这意味着要多次调用 API 以使所有内容保持最新状态。
或者我可以使用 Flink Table API 创建一个永久的 table,一旦用户创建新的警报条件,另一个 Flink 作业就会更新。
这个用例的最佳实践是什么?
备注:
- 警报应该以最短的延迟发出
- 警报标准应在用户创建后立即更新。
我认为这取决于您通常如何生成警报。我的第一个想法是使用 Kafka 来存储新的警报,以便 Flink 可以将它们作为 Stream 接收。然后,根据要求,您可以简单地 broadcast
警报流和 connect
股票价格流。这应该可以让您很好地扩展。
但是如果你使用 Table API,那么使用外部 Table 来存储数据可能也是一个好主意,那么你可以看看类似的东西that.
这是纯流式方法的设计草图:
alertUpdates = alerts
.keyBy(user)
.process(managePreviousAlerts) // uses MapState<Stock, Price>
.keyBy(stock, price)
priceUpdates = prices
.keyBy(stock)
.process(managePriceHistory)
.keyBy(stock, price)
alertUpdates
.connect(priceUpdates)
.process(manageAlertsAndPrices) // uses MapState<User, Boolean>
managePreviousAlerts
维护每个用户 MapState
从股票到警报价格。当新警报到达时,找到该股票(对于该用户)的现有警报(如果有)。然后发出两个 AlertUpdates
:一个 RemoveAlert
事件(user, stock, oldAlertPrice)和一个 AddAlert
事件(user, stock, newAlertPrice)。
managePriceHistory
在状态中保留一些每只股票的定价历史记录,并使用一些业务逻辑来决定传入的价格是否是值得触发警报的变化。 (例如,也许您只在价格下跌时发出警报。)
manageAlertsAndPrices
维护每个股票、每个价格 MapState
,由用户键入。
- 这个
MapState
的键是所有以这个价格提醒这只股票的用户。收到 PriceUpdate
后,通过遍历 MapState
. 的键来提醒所有这些用户
- 收到
RemoveAlert
后,将用户从 MapState
中删除。
- 收到
AddAlert
后,将用户添加到 MapState
。
这应该可以很好地扩展。延迟将由 keyBys 引起的两次网络洗牌控制。
假设我的 Flink 作业接收到一系列股票价格(作为示例)并在股票跌破特定价格时发出警报。用户可以添加或删除这些警报条件。例如,用户 abc@somemail.com
创建了一个规则,当 GME
的价格跌至 100 美元以下时会收到警报。我的 Flink 作业如何以可扩展的方式动态跟踪所有这些警报条件?
我可以创建一个 API,我的 Flink 作业可以调用它来获取所有更新的警报标准,但这意味着要多次调用 API 以使所有内容保持最新状态。
或者我可以使用 Flink Table API 创建一个永久的 table,一旦用户创建新的警报条件,另一个 Flink 作业就会更新。
这个用例的最佳实践是什么?
备注:
- 警报应该以最短的延迟发出
- 警报标准应在用户创建后立即更新。
我认为这取决于您通常如何生成警报。我的第一个想法是使用 Kafka 来存储新的警报,以便 Flink 可以将它们作为 Stream 接收。然后,根据要求,您可以简单地 broadcast
警报流和 connect
股票价格流。这应该可以让您很好地扩展。
但是如果你使用 Table API,那么使用外部 Table 来存储数据可能也是一个好主意,那么你可以看看类似的东西that.
这是纯流式方法的设计草图:
alertUpdates = alerts
.keyBy(user)
.process(managePreviousAlerts) // uses MapState<Stock, Price>
.keyBy(stock, price)
priceUpdates = prices
.keyBy(stock)
.process(managePriceHistory)
.keyBy(stock, price)
alertUpdates
.connect(priceUpdates)
.process(manageAlertsAndPrices) // uses MapState<User, Boolean>
managePreviousAlerts
维护每个用户MapState
从股票到警报价格。当新警报到达时,找到该股票(对于该用户)的现有警报(如果有)。然后发出两个AlertUpdates
:一个RemoveAlert
事件(user, stock, oldAlertPrice)和一个AddAlert
事件(user, stock, newAlertPrice)。managePriceHistory
在状态中保留一些每只股票的定价历史记录,并使用一些业务逻辑来决定传入的价格是否是值得触发警报的变化。 (例如,也许您只在价格下跌时发出警报。)manageAlertsAndPrices
维护每个股票、每个价格MapState
,由用户键入。- 这个
MapState
的键是所有以这个价格提醒这只股票的用户。收到PriceUpdate
后,通过遍历MapState
. 的键来提醒所有这些用户
- 收到
RemoveAlert
后,将用户从MapState
中删除。 - 收到
AddAlert
后,将用户添加到MapState
。
- 这个
这应该可以很好地扩展。延迟将由 keyBys 引起的两次网络洗牌控制。