如何在 Scala 中的数据框中获取成对的 x 值?

How to get x values as a pair on a dataframe in scala?

我有一个使用 spark 在 scala 中包含 27770 条目的数据框。此数据框仅包含一列整数。我想单独合并此列,以便生成一个新的数据框,每 2 个值都有对。我想为数据框中的每一行执行此操作。我正在尝试通过以下代码执行此操作:

for (elem1 <- nodeDf.collect()) {
  for (elem2 <- nodeDf.collect()) {
      if(elem1 != elem2 && elem2 > elem1) {
        //get pair elem1, elem2
      }
  }
}  

Intellij 向我显示有关“>”运算符的错误,表示 'cannot resolve symbol'.
我做错了什么?如何为所有值的每个组合获得一个包含两列的新数据框?

例如:输入数据框包含

1
2
3

我想得到一个新的数据框,如下所示:

1,2
1,3
2,3

我想跳过像 1,1 、 2,2 或 2,1 这样的对,因为我已经准备好了 1,2 这对我来说是一样的。

谢谢。

您可以对 DataFrame 本身做一个 carstesian product

val result =
  df.as("a").crossJoin(
    df.as("b")
  ).filter(
    ($"a.id" =!= $"b.id") && ($"b.id" > $"a.id")
  )

您的代码不起作用的原因是,在 DataFrame 上执行 collect 会得到一个 Array[Row] 而没有这样的 > Row.
中的方法 您可以通过使用 .as[Int]DataFrame 转换为 Dataset[Int] 或使用 elem1.getAsInt(0) 获取行的元素来修复它,但是...

不要那样做!.
收集returns你所有的分布式数据给驱动程序,这不仅是危险的,而且会破坏 Spark 本身的所有目的。
此外,更不用说对相同数据进行双重 collect 是无用且昂贵的

您需要交叉加入相同的 ds。在您可以将 where 子句写入 returns 之后,仅在两列之间具有不同编号的行以及仅具有 ANumber 小于 BNumber 的行。 这是一个例子:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Encoders, Row, SparkSession}
import org.scalatest.FunSuite

class Test extends FunSuite { 

  test("Test spark cross join") {
    val spark = SparkSession.builder().master("local").getOrCreate()
    import spark.implicits._

    val rows = Seq(Row(1),Row(2),Row(3))
    val schema = StructType(Seq(StructField("Number",DataTypes.IntegerType)))
    val ds = spark.createDataset(rows)(RowEncoder(schema))

    val crossJoinDs = ds.select($"Number".as("ANumber"))
      .crossJoin(ds.select($"Number".as("BNumber")))
      .where($"ANumber" =!=  $"BNumber" && $"ANumber" < $"BNumber")
      .map(r => String.valueOf(r(0))+","+String.valueOf(r(1)))(Encoders.STRING)

    crossJoinDs.show()

  }

打印以下输出:

+-----+
|value|
+-----+
|  1,2|
|  1,3|
|  2,3|
+-----+

当您编写收集并迭代结果时,您会将所有数据发送到驱动程序节点。基本上你停止了作品的分布式计算。

好的!最后,我找到了。我只需要像下面这样做一个 sql 查询:

result.createOrReplaceTempView("pairs")
var pairsDF = result.sqlContext.sql("select * from pairs a, pairs b where a.id < b.id").toDF("id_from","id_to")  

结果:

对每一行的结果进行了测试,发现可以正常工作!谢谢大家。