Apache Flink 中 Join 的输出
Output of Join in Apache Flink
在 Apache Flink 中,如果我在一个主键上连接两个数据集,我会得到一个元组 2,其中包含每个数据集中对应的数据集条目。
问题是,当将 map()
方法应用于输出的元组 2 数据集时,它看起来并不好看,尤其是当两个数据集的条目都具有大量特征时。
在两个输入数据集中使用元组得到这样的代码:
var in1: DataSet[(Int, Int, Int, Int, Int)] = /* */
var in2: DataSet[(Int, Int, Int, Int)] = /* */
val out = in1.join(in2).where(0, 1, 2).equalTo(0, 1, 2)
.map(join => (join._1._1, join._1._2, join._1._3,
join._1._4, join._1._5, join._2._4))
我不介意使用 POJO 或 case 类,但我不明白这会如何让它变得更好。
问题 1:有没有一种很好的方法来展平元组 2?例如。使用另一个运算符。
问题 2:如何处理同一个键上的 3 个数据集的连接?这会使示例源更加混乱。
感谢您的帮助。
您可以直接对每对连接元素应用连接函数,例如
val leftData: DataSet[(String, Int, Int)] = ...
val rightData: DataSet[(String, Int)] = ...
val joined: DataSet[(String, Int, Int)] = leftData
.join(rightData).where(0).equalTo(0) { (l, r) => (l._1, l._2, l._3 + r._2) ) }
要回答第二个问题,Flink 只处理二进制连接。但是,如果您提供有关函数行为的提示,Flink 的优化器可以避免进行不必要的洗牌。 Forward Field annotations 告诉优化器,某些字段(例如连接键)尚未被您的连接函数修改,并允许重用现有的分区和排序。
在 Apache Flink 中,如果我在一个主键上连接两个数据集,我会得到一个元组 2,其中包含每个数据集中对应的数据集条目。
问题是,当将 map()
方法应用于输出的元组 2 数据集时,它看起来并不好看,尤其是当两个数据集的条目都具有大量特征时。
在两个输入数据集中使用元组得到这样的代码:
var in1: DataSet[(Int, Int, Int, Int, Int)] = /* */
var in2: DataSet[(Int, Int, Int, Int)] = /* */
val out = in1.join(in2).where(0, 1, 2).equalTo(0, 1, 2)
.map(join => (join._1._1, join._1._2, join._1._3,
join._1._4, join._1._5, join._2._4))
我不介意使用 POJO 或 case 类,但我不明白这会如何让它变得更好。
问题 1:有没有一种很好的方法来展平元组 2?例如。使用另一个运算符。
问题 2:如何处理同一个键上的 3 个数据集的连接?这会使示例源更加混乱。
感谢您的帮助。
您可以直接对每对连接元素应用连接函数,例如
val leftData: DataSet[(String, Int, Int)] = ...
val rightData: DataSet[(String, Int)] = ...
val joined: DataSet[(String, Int, Int)] = leftData
.join(rightData).where(0).equalTo(0) { (l, r) => (l._1, l._2, l._3 + r._2) ) }
要回答第二个问题,Flink 只处理二进制连接。但是,如果您提供有关函数行为的提示,Flink 的优化器可以避免进行不必要的洗牌。 Forward Field annotations 告诉优化器,某些字段(例如连接键)尚未被您的连接函数修改,并允许重用现有的分区和排序。