Spark with Scala:通过在每个可能的对上执行函数来计算 table

Spark with Scala: compute a table by executing function on each possible pair

我是 Scala/Spark 的新手,我正在尝试从头开始创建一个 Spark 应用程序来计算 n 组整数之间的确切 Jaccard similarity(您不需要知道回答这个问题是什么)。

我有一个数据框,其中每一行都是一组整数,例如:

var sets = List(Set(1, 5, 7, 4), Set(3, 5, 0), Set(10, 1, 5)).toDF

和一个函数 jacsim(s1, s2) return 是两个集合之间的 Jaccard 相似度。我想定义一个函数,给定 sets 数据帧 returns 另一个数据帧,该数据帧在位置 (i, j) 处包含 jacsim(sets(i), sets(j)) 的结果。我该怎么做?

另外:将生成的数据帧用作 table 是个好主意吗?我读到 Spark 不“喜欢”索引访问的行,因为这会阻碍并行性。我应该 return 一个只有一行的数据框,每个可能的对作为一个新列吗?

如您所述,不允许使用索引访问 spark 数据帧。 这是使用 scala spark 数据框的一种解决方案:

var sets = List(Set(1, 5, 7, 4), Set(3, 5, 0), Set(10, 1, 5)).toDF("sets")
    .withColumn("i",monotonically_increasing_id()) // to create indexes


val jaccardSimUDF = udf((set1: Seq[Int], set2: Seq[Int]) => set1.sum +  set2.sum) // dummy function, replace it with your implementation of Jaccard similarity

val resDF = sets.crossJoin(sets.withColumnRenamed("sets", "sets2").withColumnRenamed("i", "j"))
                .withColumn("jaccardSim", jaccardSimUDF($"sets", $"sets2"))

基本上,我们需要将您的数据框与自身进行交叉连接,以获得所有组合。然后我们可以应用“用户定义函数”(UDF) 来计算 jaccard 相似度。请注意,我为方便起见创建了索引。

现在,如果你真的想要一个矩阵,你将需要重塑这个数据框,但这不是 spark 本质。

如评论中指出jaccard相似度函数是对称的 所以你可以过滤不必要的索引,像这样:

val resDF = sets.crossJoin(sets.withColumnRenamed("sets", "sets2").withColumnRenamed("i", "j"))
            .filter($"i" < $"j")
            .withColumn("jaccardSim", jaccardSimUDF($"sets", $"sets2"))

它可能看起来很难看,因为它仍然涉及完全交叉连接,但由于 spark 依赖于惰性计算和 Catalyst 优化器,它在实践中并不是真正的完全交叉连接。所以我认为没有更好的解决方案。