用于消息传递的 Flink 有状态函数地址解析

Flink stateful function address resolution for messaging

在Flink datastream中,假设上游operator托管在machine/task manager m上,上游operator如何知道下游operator所在的机器(任务管理器)m’被托管。是在 JobManager 对作业 sub/tasks(算子)的初始调度期间,在 downstream/upstream 算子之间建立了这样的数据流路径,并且这样的数据流路径在应用程序生命周期内是固定的吗?

更一般地,考虑支持动态消息传递且数据流不固定或未预定义的 Flink 有状态函数,并给定一个具有键 k 的函数需要将 message/event 发送给另一个函数function with key k’ function k 如何找到 function k’ 的地址用于消息传递呢? Flink 运行时是否在某些分布式数据结构(例如 Microsoft Orleans 中的 DHT)中保留了关键机器映射,并且函数的每次调用都涉及对此类数据结构的访问?

请注意,我来自 Spark 背景,在给定 RDD/batch 模型的情况下,作业图任务是连续执行的(在洗牌边界处中断),并且每个洗牌子任务都由持有密钥子集的机器指示该子任务应该 pulled/processed….

谢谢。

即使使用有状态函数,底层 Flink 作业的拓扑在作业启动时也是固定的。每个有状态函数作业都或多或少地使用一个作业图(入口各不相同,但其余的总是这样):

在这里你可以看到所有加载的入口都变成了发出输入消息的 Flink 源操作符, 路由器成为链接到这些源的平面图运算符。

充当路由器的平面图将输入消息转换为内部事件信封,这 本质上只是将消息有效负载与其目标逻辑地址包装起来。信封是 on-the-wire 流经流图的所有消息的数据类型。 Stateful Functions 运行时以函数调度器运算符为中心, 它在所有模块中运行所有加载函数的实例。

在路由器平面图运算符和函数调度程序运算符之间是一个 keyBy 操作 其中 re-partitions 输入流使用目标目的地 id 作为键。这个 network shuffle 保证发送给给定 id 的所有消息都发送到相同的 函数调度运算符的实例。

收到后,函数调度器从信封中提取目标函数地址,加载 该函数实例,然后使用包装的输入(也在 信封)。

函数调度器的不同实例如何相互发送消息?

这是由 co-locating 每个带有 反馈运算符 的函数调度器完成的。 所有传出消息都使用目标函数 id 作为密钥通过另一个网络随机播放。

此反馈运算符在作业图中创建循环或迭代。有状态函数在其消息传递模式中可以有循环或循环,并且不限于使用 DAG 处理数据。

反馈通道已设置检查点;在失败的情况下消息永远不会丢失。

有关这方面的更多信息,我推荐 Tzu-Li (Gordon) Tai 的 Flink Forward 演讲:Stateful Functions: Polyglot Event-Driven Functions for Stateful Distributed Applications。上图来自他的演讲