ApacheFlink 中的数据集联合

Union of DataSets in ApacheFlink

我正在尝试将 Seq[DataSet(Long,Long,Double)] 联合到 Flink 中的单个 DataSet[(Long,Long,Double)]

     val neighbors= graph.map(el => zKnn.neighbors(results,
      el.vector, 150, metric)).reduce(
     (a, b) => a.union(b)
      ).collect()

其中 graph 是一个常规的 scala 集合,但可以转换为 DataSet; 结果是 DataSet[Vector],不应该收集,在邻居方法中需要

我总是得到一个 FlinkRuntime 异常:

cannot currently handle nodes with more than 64 outputs. org.apache.flink.optimizer.CompilerException: Cannot currently handle nodes with more than 64 outputs. at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:347) at org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202

Flink 目前不支持超过 64 个输入数据集的联合运算符。

作为一种解决方法,您可以分层合并最多 64 个数据集,并在层次结构的级别之间注入身份映射器。 类似于:

DataSet level1a = data1.union(data2.union(data3...(data64))).map(new IDMapper());
DataSet level1b = data65.union(data66...(data128))).map(new IDMapper());
DataSet level2 = level1a.union(level1b)