Flink join、Scala 上的丰富功能 API
Rich function on Flink join, Scala API
我正在为 Flink 和 Scala 苦苦挣扎。
我对 DataSet
进行了连接转换,效果很好,但我想将其转换为 RichFuntion
,以便我可以访问广播集:
val newBoard: DataSet[Cell] = board.rightOuterJoin(neighbours)
.where("coords").equalTo("cellCoords"){
(cell, neighbours) => {
// Do some rich function things, like
// override the open method so I can get
// the broadcasted set
}
}
}.withBroadcastSet(board, "aliveCells")
我一直在查看整个文档,但我找不到在 Scala 中使用 RichJoinFuntion
的任何示例。我只找到 examples 用于 map
或 filter
中使用的丰富函数,但是 join
转换的语法不同(括号之间的函数与括号之间的函数)。
您可以将 RichJoinFunction
与 Scala 数据集 API 一起使用,如下所示
val newBoard: DataSet[Cell] = board.rightOuterJoin(neighbours)
.where("coords").equalTo("cellCoords")
.apply(new YourJoinFunction())
.withBroadcastSet(board, "aliveCells")
class YourJoinFunction extends RichJoinFunction[IN1, IN2, Cell] {
override def join(first: IN1, second: IN2): Cell = {
// Do some rich function things, like
// override the open method so I can get
// the broadcasted set
}
}
我正在为 Flink 和 Scala 苦苦挣扎。
我对 DataSet
进行了连接转换,效果很好,但我想将其转换为 RichFuntion
,以便我可以访问广播集:
val newBoard: DataSet[Cell] = board.rightOuterJoin(neighbours)
.where("coords").equalTo("cellCoords"){
(cell, neighbours) => {
// Do some rich function things, like
// override the open method so I can get
// the broadcasted set
}
}
}.withBroadcastSet(board, "aliveCells")
我一直在查看整个文档,但我找不到在 Scala 中使用 RichJoinFuntion
的任何示例。我只找到 examples 用于 map
或 filter
中使用的丰富函数,但是 join
转换的语法不同(括号之间的函数与括号之间的函数)。
您可以将 RichJoinFunction
与 Scala 数据集 API 一起使用,如下所示
val newBoard: DataSet[Cell] = board.rightOuterJoin(neighbours)
.where("coords").equalTo("cellCoords")
.apply(new YourJoinFunction())
.withBroadcastSet(board, "aliveCells")
class YourJoinFunction extends RichJoinFunction[IN1, IN2, Cell] {
override def join(first: IN1, second: IN2): Cell = {
// Do some rich function things, like
// override the open method so I can get
// the broadcasted set
}
}