如何在 Scala 中的数据框中获取成对的 x 值?
How to get x values as a pair on a dataframe in scala?
我有一个使用 spark 在 scala 中包含 27770 条目的数据框。此数据框仅包含一列整数。我想单独合并此列,以便生成一个新的数据框,每 2 个值都有对。我想为数据框中的每一行执行此操作。我正在尝试通过以下代码执行此操作:
for (elem1 <- nodeDf.collect()) {
for (elem2 <- nodeDf.collect()) {
if(elem1 != elem2 && elem2 > elem1) {
//get pair elem1, elem2
}
}
}
Intellij 向我显示有关“>”运算符的错误,表示 'cannot resolve symbol'.
我做错了什么?如何为所有值的每个组合获得一个包含两列的新数据框?
例如:输入数据框包含
1
2
3
我想得到一个新的数据框,如下所示:
1,2
1,3
2,3
我想跳过像 1,1 、 2,2 或 2,1 这样的对,因为我已经准备好了 1,2 这对我来说是一样的。
谢谢。
您可以对 DataFrame
本身做一个 carstesian product。
val result =
df.as("a").crossJoin(
df.as("b")
).filter(
($"a.id" =!= $"b.id") && ($"b.id" > $"a.id")
)
您的代码不起作用的原因是,在 DataFrame
上执行 collect
会得到一个 Array[Row]
而没有这样的 >
Row
.
中的方法
您可以通过使用 .as[Int]
将 DataFrame
转换为 Dataset[Int]
或使用 elem1.getAsInt(0)
获取行的元素来修复它,但是...
不要那样做!.
收集returns你所有的分布式数据给驱动程序,这不仅是危险的,而且会破坏 Spark 本身的所有目的。
此外,更不用说对相同数据进行双重 collect
是无用且昂贵的。
您需要交叉加入相同的 ds。在您可以将 where 子句写入 returns 之后,仅在两列之间具有不同编号的行以及仅具有 ANumber 小于 BNumber 的行。
这是一个例子:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Encoders, Row, SparkSession}
import org.scalatest.FunSuite
class Test extends FunSuite {
test("Test spark cross join") {
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val rows = Seq(Row(1),Row(2),Row(3))
val schema = StructType(Seq(StructField("Number",DataTypes.IntegerType)))
val ds = spark.createDataset(rows)(RowEncoder(schema))
val crossJoinDs = ds.select($"Number".as("ANumber"))
.crossJoin(ds.select($"Number".as("BNumber")))
.where($"ANumber" =!= $"BNumber" && $"ANumber" < $"BNumber")
.map(r => String.valueOf(r(0))+","+String.valueOf(r(1)))(Encoders.STRING)
crossJoinDs.show()
}
打印以下输出:
+-----+
|value|
+-----+
| 1,2|
| 1,3|
| 2,3|
+-----+
当您编写收集并迭代结果时,您会将所有数据发送到驱动程序节点。基本上你停止了作品的分布式计算。
好的!最后,我找到了。我只需要像下面这样做一个 sql 查询:
result.createOrReplaceTempView("pairs")
var pairsDF = result.sqlContext.sql("select * from pairs a, pairs b where a.id < b.id").toDF("id_from","id_to")
结果:
对每一行的结果进行了测试,发现可以正常工作!谢谢大家。
我有一个使用 spark 在 scala 中包含 27770 条目的数据框。此数据框仅包含一列整数。我想单独合并此列,以便生成一个新的数据框,每 2 个值都有对。我想为数据框中的每一行执行此操作。我正在尝试通过以下代码执行此操作:
for (elem1 <- nodeDf.collect()) {
for (elem2 <- nodeDf.collect()) {
if(elem1 != elem2 && elem2 > elem1) {
//get pair elem1, elem2
}
}
}
Intellij 向我显示有关“>”运算符的错误,表示 'cannot resolve symbol'.
我做错了什么?如何为所有值的每个组合获得一个包含两列的新数据框?
例如:输入数据框包含
1
2
3
我想得到一个新的数据框,如下所示:
1,2
1,3
2,3
我想跳过像 1,1 、 2,2 或 2,1 这样的对,因为我已经准备好了 1,2 这对我来说是一样的。
谢谢。
您可以对 DataFrame
本身做一个 carstesian product。
val result =
df.as("a").crossJoin(
df.as("b")
).filter(
($"a.id" =!= $"b.id") && ($"b.id" > $"a.id")
)
您的代码不起作用的原因是,在 DataFrame
上执行 collect
会得到一个 Array[Row]
而没有这样的 >
Row
.
中的方法
您可以通过使用 .as[Int]
将 DataFrame
转换为 Dataset[Int]
或使用 elem1.getAsInt(0)
获取行的元素来修复它,但是...
不要那样做!.
收集returns你所有的分布式数据给驱动程序,这不仅是危险的,而且会破坏 Spark 本身的所有目的。
此外,更不用说对相同数据进行双重 collect
是无用且昂贵的。
您需要交叉加入相同的 ds。在您可以将 where 子句写入 returns 之后,仅在两列之间具有不同编号的行以及仅具有 ANumber 小于 BNumber 的行。 这是一个例子:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Encoders, Row, SparkSession}
import org.scalatest.FunSuite
class Test extends FunSuite {
test("Test spark cross join") {
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val rows = Seq(Row(1),Row(2),Row(3))
val schema = StructType(Seq(StructField("Number",DataTypes.IntegerType)))
val ds = spark.createDataset(rows)(RowEncoder(schema))
val crossJoinDs = ds.select($"Number".as("ANumber"))
.crossJoin(ds.select($"Number".as("BNumber")))
.where($"ANumber" =!= $"BNumber" && $"ANumber" < $"BNumber")
.map(r => String.valueOf(r(0))+","+String.valueOf(r(1)))(Encoders.STRING)
crossJoinDs.show()
}
打印以下输出:
+-----+
|value|
+-----+
| 1,2|
| 1,3|
| 2,3|
+-----+
当您编写收集并迭代结果时,您会将所有数据发送到驱动程序节点。基本上你停止了作品的分布式计算。
好的!最后,我找到了。我只需要像下面这样做一个 sql 查询:
result.createOrReplaceTempView("pairs")
var pairsDF = result.sqlContext.sql("select * from pairs a, pairs b where a.id < b.id").toDF("id_from","id_to")
结果:
对每一行的结果进行了测试,发现可以正常工作!谢谢大家。