ApacheFlink 中的数据集联合
Union of DataSets in ApacheFlink
我正在尝试将 Seq[DataSet(Long,Long,Double)]
联合到
Flink 中的单个 DataSet[(Long,Long,Double)]
:
val neighbors= graph.map(el => zKnn.neighbors(results,
el.vector, 150, metric)).reduce(
(a, b) => a.union(b)
).collect()
其中 graph 是一个常规的 scala 集合,但可以转换为 DataSet;
结果是 DataSet[Vector]
,不应该收集,在邻居方法中需要
我总是得到一个 FlinkRuntime 异常:
cannot currently handle nodes with more than 64 outputs.
org.apache.flink.optimizer.CompilerException: Cannot currently handle nodes with more than 64 outputs.
at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:347)
at org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202
Flink 目前不支持超过 64 个输入数据集的联合运算符。
作为一种解决方法,您可以分层合并最多 64 个数据集,并在层次结构的级别之间注入身份映射器。
类似于:
DataSet level1a = data1.union(data2.union(data3...(data64))).map(new IDMapper());
DataSet level1b = data65.union(data66...(data128))).map(new IDMapper());
DataSet level2 = level1a.union(level1b)
我正在尝试将 Seq[DataSet(Long,Long,Double)]
联合到
Flink 中的单个 DataSet[(Long,Long,Double)]
:
val neighbors= graph.map(el => zKnn.neighbors(results,
el.vector, 150, metric)).reduce(
(a, b) => a.union(b)
).collect()
其中 graph 是一个常规的 scala 集合,但可以转换为 DataSet;
结果是 DataSet[Vector]
,不应该收集,在邻居方法中需要
我总是得到一个 FlinkRuntime 异常:
cannot currently handle nodes with more than 64 outputs. org.apache.flink.optimizer.CompilerException: Cannot currently handle nodes with more than 64 outputs. at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:347) at org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202
Flink 目前不支持超过 64 个输入数据集的联合运算符。
作为一种解决方法,您可以分层合并最多 64 个数据集,并在层次结构的级别之间注入身份映射器。 类似于:
DataSet level1a = data1.union(data2.union(data3...(data64))).map(new IDMapper());
DataSet level1b = data65.union(data66...(data128))).map(new IDMapper());
DataSet level2 = level1a.union(level1b)