如何拆分数据集(使用数据透视表)?
How to unstack dataset (using pivot)?
我在 larger stacked dataset 上尝试了 1.6 的新 "pivot" 功能。它有 5,656,458 行,IndicatorCode
列有 1344 个不同的代码。
我们的想法是使用 pivot 到 "unstack"(在 pandas 术语中)这个数据集,并为每个 IndicatorCode 有一列。
schema = StructType([ \
StructField("CountryName", StringType(), True), \
StructField("CountryCode", StringType(), True), \
StructField("IndicatorName", StringType(), True), \
StructField("IndicatorCode", StringType(), True), \
StructField("Year", IntegerType(), True), \
StructField("Value", DoubleType(), True) \
])
data = sqlContext.read.load('hdfs://localhost:9000/tmp/world-development-indicators/Indicators.csv',
format='com.databricks.spark.csv',
header='true',
schema=schema)
data2 = indicators_csv.withColumn("IndicatorCode2", regexp_replace("indicatorCode", "\.", "_"))\
.select(["CountryCode", "IndicatorCode2", "Year", "Value"])
columns = [row.IndicatorCode2 for row in data2.select("IndicatorCode2").distinct().collect()]
data3 = data2.groupBy(["Year", "CountryCode"])\
.pivot("IndicatorCode2", columns)\
.max("Value")
虽然成功返回,data3.first()
从未返回结果(我在 10 分钟后使用 3 个内核在我的独立设备上中断)。
我使用 RDD
和 aggregateByKey
的方法效果很好,所以我不是在寻找解决方案,而是在寻找使用 DataFrames 进行数据透视是否也能解决问题。
好吧,旋转通常不是一个非常有效的操作,使用 DataFrame
API 对此您无能为力。不过,您可以尝试的一件事是 repartition
您的数据:
(data2
.repartition("Year", "CountryCode")
.groupBy("Year", "CountryCode")
.pivot("IndicatorCode2", columns)
.max("Value"))
甚至合计:
from pyspark.sql.functions import max
(df
.groupBy("Year", "CountryCode", "IndicatorCode")
.agg(max("Value").alias("Value"))
.groupBy("Year", "CountryCode")
.pivot("IndicatorCode", columns)
.max("Value"))
申请前pivot
。两种解决方案背后的想法是相同的。而不是移动大的扩展 Rows
移动狭窄的密集数据并在本地扩展。
Spark 2.0 引入了SPARK-13749 一种对大量数据透视列值更快的数据透视实现。
在我的计算机上使用 Spark 2.1.0 进行测试,您的示例现在运行时间为 48 秒。
我在 larger stacked dataset 上尝试了 1.6 的新 "pivot" 功能。它有 5,656,458 行,IndicatorCode
列有 1344 个不同的代码。
我们的想法是使用 pivot 到 "unstack"(在 pandas 术语中)这个数据集,并为每个 IndicatorCode 有一列。
schema = StructType([ \
StructField("CountryName", StringType(), True), \
StructField("CountryCode", StringType(), True), \
StructField("IndicatorName", StringType(), True), \
StructField("IndicatorCode", StringType(), True), \
StructField("Year", IntegerType(), True), \
StructField("Value", DoubleType(), True) \
])
data = sqlContext.read.load('hdfs://localhost:9000/tmp/world-development-indicators/Indicators.csv',
format='com.databricks.spark.csv',
header='true',
schema=schema)
data2 = indicators_csv.withColumn("IndicatorCode2", regexp_replace("indicatorCode", "\.", "_"))\
.select(["CountryCode", "IndicatorCode2", "Year", "Value"])
columns = [row.IndicatorCode2 for row in data2.select("IndicatorCode2").distinct().collect()]
data3 = data2.groupBy(["Year", "CountryCode"])\
.pivot("IndicatorCode2", columns)\
.max("Value")
虽然成功返回,data3.first()
从未返回结果(我在 10 分钟后使用 3 个内核在我的独立设备上中断)。
我使用 RDD
和 aggregateByKey
的方法效果很好,所以我不是在寻找解决方案,而是在寻找使用 DataFrames 进行数据透视是否也能解决问题。
好吧,旋转通常不是一个非常有效的操作,使用 DataFrame
API 对此您无能为力。不过,您可以尝试的一件事是 repartition
您的数据:
(data2
.repartition("Year", "CountryCode")
.groupBy("Year", "CountryCode")
.pivot("IndicatorCode2", columns)
.max("Value"))
甚至合计:
from pyspark.sql.functions import max
(df
.groupBy("Year", "CountryCode", "IndicatorCode")
.agg(max("Value").alias("Value"))
.groupBy("Year", "CountryCode")
.pivot("IndicatorCode", columns)
.max("Value"))
申请前pivot
。两种解决方案背后的想法是相同的。而不是移动大的扩展 Rows
移动狭窄的密集数据并在本地扩展。
Spark 2.0 引入了SPARK-13749 一种对大量数据透视列值更快的数据透视实现。
在我的计算机上使用 Spark 2.1.0 进行测试,您的示例现在运行时间为 48 秒。