在 Apache Flink 中使用外部值作为条件
Using external values as conditions in Apache Flink
我正在构建一个应用程序,它需要汇总来自部署在不同区域的一系列传感器的测量值。这些措施是使用卡夫卡摄取的。我是 Flink 的新手,但我已经想出了如何使用 window 聚合事件并将它们发送到接收器中。但是,我还需要将每个区域的聚合值与来自外部数据库(在我的例子中是 Postgres)的阈值(也是每个区域)进行比较。这些阈值也可以随时间更新或在创建新区域时添加。有什么建议吗?
谢谢
欧元
要从 Postgres 流式传输阈值,您可以设置一个 Table 通过 debezium 连接到 Postgres 的源。 https://www.youtube.com/watch?v=wRIQqgI1gLA is a good resource for learning about that. Or you could do a lookup join, with caching(如果需要的话)。
一旦你获得了阈值流,那么你可以,例如,将阈值的变更日志流建模为版本化 table,并使用 temporal join 将其与主流。
如果出于某种原因无法满足您的需求,您可以将动态 table 转换为 DataStreams,并使用 KeyedCoProcessFunction
实现较低级别的解决方案(更多工作但更灵活)。或者,如果两个数据源之间没有共享密钥,您可以改为 broadcast 阈值。
我正在构建一个应用程序,它需要汇总来自部署在不同区域的一系列传感器的测量值。这些措施是使用卡夫卡摄取的。我是 Flink 的新手,但我已经想出了如何使用 window 聚合事件并将它们发送到接收器中。但是,我还需要将每个区域的聚合值与来自外部数据库(在我的例子中是 Postgres)的阈值(也是每个区域)进行比较。这些阈值也可以随时间更新或在创建新区域时添加。有什么建议吗? 谢谢 欧元
要从 Postgres 流式传输阈值,您可以设置一个 Table 通过 debezium 连接到 Postgres 的源。 https://www.youtube.com/watch?v=wRIQqgI1gLA is a good resource for learning about that. Or you could do a lookup join, with caching(如果需要的话)。
一旦你获得了阈值流,那么你可以,例如,将阈值的变更日志流建模为版本化 table,并使用 temporal join 将其与主流。
如果出于某种原因无法满足您的需求,您可以将动态 table 转换为 DataStreams,并使用 KeyedCoProcessFunction
实现较低级别的解决方案(更多工作但更灵活)。或者,如果两个数据源之间没有共享密钥,您可以改为 broadcast 阈值。