Spark Dataframe:枢轴排序
Spark Dataframe : Pivot with sorting
我正在将以下 json
文件读入 spark 中的 Dataframe:
{"id" : "a", "country" : "uk", "date" : "2016-01-01"}
{"id" : "b", "country" : "uk", "date" : "2016-01-02"}
{"id" : "c", "country" : "fr", "date" : "2016-02-01"}
{"id" : "d", "country" : "de", "date" : "2016-03-01"}
{"id" : "e", "country" : "tk", "date" : "2016-04-01"}
{"id" : "f", "country" : "be", "date" : "2016-05-01"}
{"id" : "g", "country" : "nl", "date" : "2016-06-01"}
{"id" : "h", "country" : "uk", "date" : "2016-06-01"}
然后我在其上应用 groupBy
并根据日期调整它,这是(伪)代码:
val df = spark.read.json("file.json")
val dfWithFormattedDate = df.withColumn("date", date_format(col("date"), "yyyy-MM"))
dfWithFormattedDate.groupBy("country").pivot("date").agg(countDistinct("id").alias("count")).orderBy("country")
这为我提供了 Dataframe
列,其中包含国家/地区和旋转日期(月)。然后我想 order
按总计数的降序排列结果。但是,我没有 count
作为列之一,在 groupBy 上应用 count()
后我无法应用 pivot
因为它 returns Dataset
和不是 RelationalGroupedDataset
。我也尝试了以下方法:
dfWithFormattedDate.groupBy("country").pivot("date").count()
这也不给我 count
列。有什么办法可以在结果数据集中同时提供 count
和旋转日期,以便我可以按计数降序排序?
更新
这是当前输出:
country|2016-01|2016-02|2016-03| ....
fr | null | 1 | null |
be | null | null | null |
uk | 2 | null | null |
这是预期的输出:
country|count|2016-01|2016-02|2016-03| ....
uk | 3 | 2 | null | null |
fr | 1 | null | 1 | null |
be | 1 | null | null | null |
如您所见,我需要结果中的 count
列,并按 count
的降序对行进行排序。没有明确包含 count
列的排序也可以。
如果我们的起点是这样DataFrame
:
import org.apache.spark.sql.functions.{date_format ,col, countDistinct}
val result = df.withColumn("date", date_format(col("date"), "yyyy-MM"))
.groupBy("country").pivot("date").agg(countDistinct("id").alias("count"))
.na.fill(0)
然后我们可以简单地计算除 country
列之外的所有列的行和:
import org.apache.spark.sql.functions.desc
val test = result.withColumn("count",
result.columns.filter(_!="country")
.map(c => col(c))
.reduce((x, y) => x + y))
.orderBy(desc("count"))
test.show()
+-------+-------+-------+-------+-------+-------+-------+-----+
|country|2016-01|2016-02|2016-03|2016-04|2016-05|2016-06|count|
+-------+-------+-------+-------+-------+-------+-------+-----+
| uk| 2| 0| 0| 0| 0| 1| 3|
| be| 0| 0| 0| 0| 1| 0| 1|
| de| 0| 0| 1| 0| 0| 0| 1|
| tk| 0| 0| 0| 1| 0| 0| 1|
| nl| 0| 0| 0| 0| 0| 1| 1|
| fr| 0| 1| 0| 0| 0| 0| 1|
+-------+-------+-------+-------+-------+-------+-------+-----+
我正在将以下 json
文件读入 spark 中的 Dataframe:
{"id" : "a", "country" : "uk", "date" : "2016-01-01"}
{"id" : "b", "country" : "uk", "date" : "2016-01-02"}
{"id" : "c", "country" : "fr", "date" : "2016-02-01"}
{"id" : "d", "country" : "de", "date" : "2016-03-01"}
{"id" : "e", "country" : "tk", "date" : "2016-04-01"}
{"id" : "f", "country" : "be", "date" : "2016-05-01"}
{"id" : "g", "country" : "nl", "date" : "2016-06-01"}
{"id" : "h", "country" : "uk", "date" : "2016-06-01"}
然后我在其上应用 groupBy
并根据日期调整它,这是(伪)代码:
val df = spark.read.json("file.json")
val dfWithFormattedDate = df.withColumn("date", date_format(col("date"), "yyyy-MM"))
dfWithFormattedDate.groupBy("country").pivot("date").agg(countDistinct("id").alias("count")).orderBy("country")
这为我提供了 Dataframe
列,其中包含国家/地区和旋转日期(月)。然后我想 order
按总计数的降序排列结果。但是,我没有 count
作为列之一,在 groupBy 上应用 count()
后我无法应用 pivot
因为它 returns Dataset
和不是 RelationalGroupedDataset
。我也尝试了以下方法:
dfWithFormattedDate.groupBy("country").pivot("date").count()
这也不给我 count
列。有什么办法可以在结果数据集中同时提供 count
和旋转日期,以便我可以按计数降序排序?
更新
这是当前输出:
country|2016-01|2016-02|2016-03| ....
fr | null | 1 | null |
be | null | null | null |
uk | 2 | null | null |
这是预期的输出:
country|count|2016-01|2016-02|2016-03| ....
uk | 3 | 2 | null | null |
fr | 1 | null | 1 | null |
be | 1 | null | null | null |
如您所见,我需要结果中的 count
列,并按 count
的降序对行进行排序。没有明确包含 count
列的排序也可以。
如果我们的起点是这样DataFrame
:
import org.apache.spark.sql.functions.{date_format ,col, countDistinct}
val result = df.withColumn("date", date_format(col("date"), "yyyy-MM"))
.groupBy("country").pivot("date").agg(countDistinct("id").alias("count"))
.na.fill(0)
然后我们可以简单地计算除 country
列之外的所有列的行和:
import org.apache.spark.sql.functions.desc
val test = result.withColumn("count",
result.columns.filter(_!="country")
.map(c => col(c))
.reduce((x, y) => x + y))
.orderBy(desc("count"))
test.show()
+-------+-------+-------+-------+-------+-------+-------+-----+
|country|2016-01|2016-02|2016-03|2016-04|2016-05|2016-06|count|
+-------+-------+-------+-------+-------+-------+-------+-----+
| uk| 2| 0| 0| 0| 0| 1| 3|
| be| 0| 0| 0| 0| 1| 0| 1|
| de| 0| 0| 1| 0| 0| 0| 1|
| tk| 0| 0| 0| 1| 0| 0| 1|
| nl| 0| 0| 0| 0| 0| 1| 1|
| fr| 0| 1| 0| 0| 0| 0| 1|
+-------+-------+-------+-------+-------+-------+-------+-----+