将列添加到 PySpark 数据框包含基于对另外两个列的分组的列的标准偏差
Adding a column to a PySpark dataframe contans standard deviations of a column based on the grouping on two another columns
假设我们有一个 csv 文件已作为数据框导入 PysPark,如下所示
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("file path and name.csv", inferSchema = True, header = True)
df.show()
output
+-----+----+----+
|lable|year|val |
+-----+----+----+
| A|2003| 5.0|
| A|2003| 6.0|
| A|2003| 3.0|
| A|2004|null|
| B|2000| 2.0|
| B|2000|null|
| B|2009| 1.0|
| B|2000| 6.0|
| B|2009| 6.0|
+-----+----+----+
现在,我们要向 df
添加另一列,其中包含基于 lable
和 year
两列分组的标准差 val
。因此,输出必须如下所示:
+-----+----+----+-----+
|lable|year|val | std |
+-----+----+----+-----+
| A|2003| 5.0| 1.53|
| A|2003| 6.0| 1.53|
| A|2003| 3.0| 1.53|
| A|2004|null| null|
| B|2000| 2.0| 2.83|
| B|2000|null| 2.83|
| B|2009| 1.0| 3.54|
| B|2000| 6.0| 2.83|
| B|2009| 6.0| 3.54|
+-----+----+----+-----+
我有以下代码适用于小型数据框,但不适用于我现在正在使用的非常大的数据框(约 4000 万行)。
import pyspark.sql.functions as f
a = df.groupby('lable','year').agg(f.round(f.stddev("val"),2).alias('std'))
df = df.join(a, on = ['lable', 'year'], how = 'inner')
我在我的大型数据帧上 运行 后收到 Py4JJavaError Traceback (most recent call last)
错误。
有人知道其他方法吗?我希望你的方法适用于我的数据集。
我正在使用 python3.7.1
、pyspark2.4
和 jupyter4.4.0
dataframe 上的连接导致执行器之间的大量数据混洗。在您的情况下,您可以不加入。
使用 window 规范按 'lable' 和 'year' 对数据进行分区并在 window.
上聚合
from pyspark.sql.window import *
windowSpec = Window.partitionBy('lable','year')\
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df = df.withColumn("std", f.round(f.stddev("val").over(windowSpec), 2))
假设我们有一个 csv 文件已作为数据框导入 PysPark,如下所示
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("file path and name.csv", inferSchema = True, header = True)
df.show()
output
+-----+----+----+
|lable|year|val |
+-----+----+----+
| A|2003| 5.0|
| A|2003| 6.0|
| A|2003| 3.0|
| A|2004|null|
| B|2000| 2.0|
| B|2000|null|
| B|2009| 1.0|
| B|2000| 6.0|
| B|2009| 6.0|
+-----+----+----+
现在,我们要向 df
添加另一列,其中包含基于 lable
和 year
两列分组的标准差 val
。因此,输出必须如下所示:
+-----+----+----+-----+
|lable|year|val | std |
+-----+----+----+-----+
| A|2003| 5.0| 1.53|
| A|2003| 6.0| 1.53|
| A|2003| 3.0| 1.53|
| A|2004|null| null|
| B|2000| 2.0| 2.83|
| B|2000|null| 2.83|
| B|2009| 1.0| 3.54|
| B|2000| 6.0| 2.83|
| B|2009| 6.0| 3.54|
+-----+----+----+-----+
我有以下代码适用于小型数据框,但不适用于我现在正在使用的非常大的数据框(约 4000 万行)。
import pyspark.sql.functions as f
a = df.groupby('lable','year').agg(f.round(f.stddev("val"),2).alias('std'))
df = df.join(a, on = ['lable', 'year'], how = 'inner')
我在我的大型数据帧上 运行 后收到 Py4JJavaError Traceback (most recent call last)
错误。
有人知道其他方法吗?我希望你的方法适用于我的数据集。
我正在使用 python3.7.1
、pyspark2.4
和 jupyter4.4.0
dataframe 上的连接导致执行器之间的大量数据混洗。在您的情况下,您可以不加入。 使用 window 规范按 'lable' 和 'year' 对数据进行分区并在 window.
上聚合from pyspark.sql.window import *
windowSpec = Window.partitionBy('lable','year')\
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df = df.withColumn("std", f.round(f.stddev("val").over(windowSpec), 2))