是否可以避免交叉转换?

Is it possible to avoid cross transformation?

先生们,

我在 Apache Flink 中使用 Batch,使用 DataSet API,我想计算 DataSet 中所有元素的“相似度”。

让函数 CalculateSimilarity(e1, e2) 计算并 return 元素 e1 和 e2 的相似度。

将数据集与自身交叉工作正常,但是,我浪费了很多时间和不必要的微积分处理。我真的不需要计算所有元素的笛卡尔积,因为,可以做一些改进:

i) 不需要计算相同元素的相似度。例如计算相似度(A,A)
ii) 计算相似度(A,B) ⇔ 计算相似度(B,A)。相似度(A,B)和(B,A)是等价的,我只需要计算其中之一即可。

使用 flink,我如何应用转换来计算必要的相似性而不是所有相似性(交叉)?

如果我在上面不清楚,这里有一个简单的例子:
Dt = 包含 4 个元素的数据集。
Dt = {e1, e2, e3 , e4}.
如果我使用交叉 ( Dt.cross(Dt) ),它 return 是所有这些组合:((e1,e1),(e1,e2),(e1,e3),(e1,e4) ,(e2,e1),(e2,e2),(e2,e3),(e2,e4),(e3,e1),...,(e4,e4)).
但是,我只需要这些组合:(e1,e2),(e1,e3),(e1,e4),(e2,e3),(e2,e4),(e3,e4)。

感谢您的帮助!

您可以做的是手动构造一个避免排列的连接模式。您可以通过为每个元素分配一个递增的索引(0 到元素数量 - 1)然后让每个元素仅与索引低于或等于其自身的元素连接来实现:

val env = ExecutionEnvironment.getExecutionEnvironment

val input = env.fromElements(1, 2, 3, 4, 5, 6).rebalance()

// we first assign an increasing index from 0 to input.size - 1 to each element
val indexedInput = input.zipWithIndex

// here we generate the join set where we say that (idx, element) will be joined with all
// elements whose index is at most idx
val joinSet = indexedInput.flatMap{
  input => for (i <- 0 to input._1.toInt) yield (i.toLong, input._2)
}

// doing the join operation
val resultSet = indexedInput.join(joinSet).where(_._1).equalTo(_._1).apply{
  (a, b) => (a._2, b._2)
}

您应该尝试哪些程序运行得更快,因为 zipWithIndex 将触发单独的作业执行。