Flink join、Scala 上的丰富功能 API

Rich function on Flink join, Scala API

我正在为 Flink 和 Scala 苦苦挣扎。

我对 DataSet 进行了连接转换,效果很好,但我想将其转换为 RichFuntion,以便我可以访问广播集:

val newBoard: DataSet[Cell] = board.rightOuterJoin(neighbours)
                             .where("coords").equalTo("cellCoords"){

    (cell, neighbours) => {
            // Do some rich function things, like 
            // override the open method so I can get
            // the broadcasted set
    }

  }

}.withBroadcastSet(board, "aliveCells")

我一直在查看整个文档,但我找不到在 Scala 中使用 RichJoinFuntion 的任何示例。我只找到 examples 用于 mapfilter 中使用的丰富函数,但是 join 转换的语法不同(括号之间的函数与括号之间的函数)。

您可以将 RichJoinFunction 与 Scala 数据集 API 一起使用,如下所示

val newBoard: DataSet[Cell] = board.rightOuterJoin(neighbours)
                             .where("coords").equalTo("cellCoords")
                               .apply(new YourJoinFunction())
                               .withBroadcastSet(board, "aliveCells")

class YourJoinFunction extends RichJoinFunction[IN1, IN2, Cell] {
  override def join(first: IN1, second: IN2): Cell = {
    // Do some rich function things, like 
    // override the open method so I can get
    // the broadcasted set
  }
}