(考虑缺失值的 pyspark 加权平均值
(py)spark weighted average taking account of missing values
是否有一种规范的方法来计算 pyspark 中的加权平均值而忽略分母总和中的缺失值?
举个例子:
# create data
data2 = [(1,1,1,1),
(1,None,1,2),
(2,1,1,1),
(2,3,1,2),
]
schema = (StructType([
StructField("group",IntegerType(),True),
StructField("var1",IntegerType(),True),
StructField("var2",IntegerType(),True),
StructField("wght", IntegerType(), True),
]))
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)
+-----+----+----+----+
|group|var1|var2|wght|
+-----+----+----+----+
|1 |1 |1 |1 |
|1 |null|1 |2 |
|2 |1 |1 |1 |
|2 |3 |1 |2 |
+-----+----+----+----+
我可以计算加权平均值,如其他地方所述:
(df.groupBy("group").agg(
(F.sum(col("var1")*col("wght"))/F.sum("wght")).alias("wgtd_var1"),
(F.sum(col("var2")*col("wght"))/F.sum("wght")).alias("wgtd_var2")).show(truncate=False))
+-----+------------------+---------+
|group|wgtd_var1 |wgtd_var2|
+-----+------------------+---------+
|1 |0.3333333333333333|1.0 |
|2 |2.3333333333333335|1.0 |
+-----+------------------+---------+
但问题是对于第 1 组,加权平均值应该是 1,因为不应使用第二个观察值。我可以
# get new weights
df = (df.withColumn("wghtvar1", F.when(col("var1").isNull(), None)
.otherwise(col("wght")))
.withColumn("wghtvar2", F.when(col("var2").isNull(), None)
.otherwise(col("wght"))))
# compute correct weighted average
(df.groupBy("group").agg(
(F.sum(col("var1")*col("wghtvar1"))/F.sum("wghtvar1")).alias("wgtd_var1"),
(F.sum(col("var2")*col("wghtvar2"))/F.sum("wghtvar2")).alias("wgtd_var2")).show(truncate=False))
+-----+------------------+---------+
|group|wgtd_var1 |wgtd_var2|
+-----+------------------+---------+
|1 |1.0 |1.0 |
|2 |2.3333333333333335|1.0 |
+-----+------------------+---------+
有规范的方法吗?
差别不大,但至少可以避免为每个变量创建新的 wght 列。
条件聚合。
df = (df.groupby('group')
.agg(
(F.sum(F.when(F.col('var1').isNotNull(), F.col('var1') * F.col('wght')))
/
(F.sum(F.when(F.col('var1').isNotNull(), F.col('wght'))))
).alias('wgtd_var1')
))
为了将此应用于多个 var
,我可以使用列表理解。
df = (df.groupby('group')
.agg(*[
(F.sum(F.when(F.col(x).isNotNull(), F.col(x) * F.col('wght')))
/
(F.sum(F.when(F.col(x).isNotNull(), F.col('wght'))))
).alias(f'wgtd_{x}')
for x in ['var1', 'var2']
]))
是否有一种规范的方法来计算 pyspark 中的加权平均值而忽略分母总和中的缺失值?
举个例子:
# create data
data2 = [(1,1,1,1),
(1,None,1,2),
(2,1,1,1),
(2,3,1,2),
]
schema = (StructType([
StructField("group",IntegerType(),True),
StructField("var1",IntegerType(),True),
StructField("var2",IntegerType(),True),
StructField("wght", IntegerType(), True),
]))
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)
+-----+----+----+----+
|group|var1|var2|wght|
+-----+----+----+----+
|1 |1 |1 |1 |
|1 |null|1 |2 |
|2 |1 |1 |1 |
|2 |3 |1 |2 |
+-----+----+----+----+
我可以计算加权平均值,如其他地方所述:
(df.groupBy("group").agg(
(F.sum(col("var1")*col("wght"))/F.sum("wght")).alias("wgtd_var1"),
(F.sum(col("var2")*col("wght"))/F.sum("wght")).alias("wgtd_var2")).show(truncate=False))
+-----+------------------+---------+
|group|wgtd_var1 |wgtd_var2|
+-----+------------------+---------+
|1 |0.3333333333333333|1.0 |
|2 |2.3333333333333335|1.0 |
+-----+------------------+---------+
但问题是对于第 1 组,加权平均值应该是 1,因为不应使用第二个观察值。我可以
# get new weights
df = (df.withColumn("wghtvar1", F.when(col("var1").isNull(), None)
.otherwise(col("wght")))
.withColumn("wghtvar2", F.when(col("var2").isNull(), None)
.otherwise(col("wght"))))
# compute correct weighted average
(df.groupBy("group").agg(
(F.sum(col("var1")*col("wghtvar1"))/F.sum("wghtvar1")).alias("wgtd_var1"),
(F.sum(col("var2")*col("wghtvar2"))/F.sum("wghtvar2")).alias("wgtd_var2")).show(truncate=False))
+-----+------------------+---------+
|group|wgtd_var1 |wgtd_var2|
+-----+------------------+---------+
|1 |1.0 |1.0 |
|2 |2.3333333333333335|1.0 |
+-----+------------------+---------+
有规范的方法吗?
差别不大,但至少可以避免为每个变量创建新的 wght 列。
条件聚合。
df = (df.groupby('group')
.agg(
(F.sum(F.when(F.col('var1').isNotNull(), F.col('var1') * F.col('wght')))
/
(F.sum(F.when(F.col('var1').isNotNull(), F.col('wght'))))
).alias('wgtd_var1')
))
为了将此应用于多个 var
,我可以使用列表理解。
df = (df.groupby('group')
.agg(*[
(F.sum(F.when(F.col(x).isNotNull(), F.col(x) * F.col('wght')))
/
(F.sum(F.when(F.col(x).isNotNull(), F.col('wght'))))
).alias(f'wgtd_{x}')
for x in ['var1', 'var2']
]))