Pyspark 和 Graphframes:聚合消息功率均值
Pyspark and Graphframes: Aggregate messages power mean
给定下图:
其中 A
的值为 20
,B
的值为 5
,C
的值为 10
,我想用 pyspark/graphframes 来计算 power mean。也就是说,
在这种情况下,n
是项目数(3
在我们的情况下,三个顶点位于 A
- 包括 A
),我们的 p
被取为 n * 2
,归一化因子为 1/n
,或 1/3
。所以 A
的结果值应该 是:
n = 3
norm_factor = 1/n
p = n * 2
result = (norm_factor * (20^p + 5^p + 10^p))^(1/p) = 16.697421658890875
所以问题是,如何用 pyspark/graphframes 计算这个?我有下图:
spark = SparkSession.builder.appName('get-the-power').getOrCreate()
vertices = spark.createDataFrame([('1', 'A', 20),
('2', 'B', 5),
('3', 'C', 10)],
['id', 'name', 'value'])
edges = spark.createDataFrame([('1', '2'),
('1', '3')],
['src', 'dst'])
g = GraphFrame(vertices, edges)
我假设我需要聚合子项的值,并且一直在玩消息聚合。
agg = g.aggregateMessages(
sqlsum(AM.msg).alias("totalValue"),
sendToSrc=AM.dst['value'],
sendToDst=AM.dst['value'])
agg.show()
这导致
+---+----------+
| id|totalValue|
+---+----------+
| 3| 10|
| 1| 15|
| 2| 5|
+---+----------+
如何用幂平均值替换 totalValue
(sqlsum
)?肯定有办法使用 pyspark 中的 Spark 函数来做到这一点吗?
---更新---
看来我可以用 UDF 对此进行近似。
def power_mean(values):
n = len(values)
norm_factor = 1/n
p = n * 2
return (norm_factor * sum([(x)**p for x in values]))**(1/p)
udf_power_mean = func.udf(power_mean, returnType=DoubleType())
# Aggregate the values from child vertices, as I was doing before.
agg = g.aggregateMessages(
collect_list(AM.msg).alias("totalValue"),
sendToSrc=AM.dst['value'],
sendToDst=None)
# `concat` the value for this vertex with its children values.
# We end up with an `array<int>` that we then pass to `udf_power_mean`.
new_vertices = agg.join(vertices, vertices.id == agg.id, "left")\
.select(vertices.id, \
'name', \
'value', \
concat(array(col('value')), 'totalValue').alias("allValues"))\
.withColumn('totalScore', udf_power_mean(col('allValues')))\
.drop('allValues')
new_vertices.show()
这会产生:
+---+----+-----+------------------+
| id|name|value| totalScore|
+---+----+-----+------------------+
| 1| foo| 20|16.697421658890875|
+---+----+-----+------------------+
有没有没有 UDF 的方法?只是普通的 spark 函数?
对于 Spark 2.4+,您可以使用 aggregate 函数:
简单版:
power_mean = lambda col: func.expr(f"""
aggregate(`{col}`, 0D, (acc,x) -> acc+power(x,2*size(`{col}`)), acc -> power(acc/size(`{col}`), 0.5/size(`{col}`)))
""")
上述解决方案的一个问题是,如果任何数组元素为 NULL,则结果 totalScore
将为 NULL,为避免这种情况,您可以这样做以下:
power_mean = lambda col: func.expr(f"""
aggregate(
/* expr: array column to iterate through */
`{col}`,
/* start: set zero value and the accumulator as an struct<psum:double,n:int> */
(0D as psum, size(filter(`{col}`, x -> x is not null)) as n),
/* merge: calculate `sum([(x)**p for x in values])` */
(acc,x) -> (acc.psum+power(coalesce(x,0),2*acc.n) as psum, acc.n as n),
/* finish: post processing */
acc -> power(acc.psum/acc.n, 0.5/acc.n)
)
""")
取决于你想如何设置 n
,以上将跳过 n
中的空值,
如果你想计算它们,只需将第二个参数从:
(0D as psum, size(filter(`{col}`, x -> x is not null)) as n),
到
(0D as psum, size(`{col}`) as n),
示例:
df = spark.createDataFrame([([20,5,None,10],)],['value'])
df.select("value", power_mean("value").alias('totalScore')).show(truncate=False)
+------------+------------------+
|value |totalScore |
+------------+------------------+
|[20, 5,, 10]|16.697421658984894|
+------------+------------------+
顺便说一句。如果你想 concat()
totalScore 即使其他列为 NULL 值,只需使用 coalesce()
函数,或者如果可能的话 concat_ws()
。
给定下图:
其中 A
的值为 20
,B
的值为 5
,C
的值为 10
,我想用 pyspark/graphframes 来计算 power mean。也就是说,
在这种情况下,n
是项目数(3
在我们的情况下,三个顶点位于 A
- 包括 A
),我们的 p
被取为 n * 2
,归一化因子为 1/n
,或 1/3
。所以 A
的结果值应该 是:
n = 3
norm_factor = 1/n
p = n * 2
result = (norm_factor * (20^p + 5^p + 10^p))^(1/p) = 16.697421658890875
所以问题是,如何用 pyspark/graphframes 计算这个?我有下图:
spark = SparkSession.builder.appName('get-the-power').getOrCreate()
vertices = spark.createDataFrame([('1', 'A', 20),
('2', 'B', 5),
('3', 'C', 10)],
['id', 'name', 'value'])
edges = spark.createDataFrame([('1', '2'),
('1', '3')],
['src', 'dst'])
g = GraphFrame(vertices, edges)
我假设我需要聚合子项的值,并且一直在玩消息聚合。
agg = g.aggregateMessages(
sqlsum(AM.msg).alias("totalValue"),
sendToSrc=AM.dst['value'],
sendToDst=AM.dst['value'])
agg.show()
这导致
+---+----------+
| id|totalValue|
+---+----------+
| 3| 10|
| 1| 15|
| 2| 5|
+---+----------+
如何用幂平均值替换 totalValue
(sqlsum
)?肯定有办法使用 pyspark 中的 Spark 函数来做到这一点吗?
---更新---
看来我可以用 UDF 对此进行近似。
def power_mean(values):
n = len(values)
norm_factor = 1/n
p = n * 2
return (norm_factor * sum([(x)**p for x in values]))**(1/p)
udf_power_mean = func.udf(power_mean, returnType=DoubleType())
# Aggregate the values from child vertices, as I was doing before.
agg = g.aggregateMessages(
collect_list(AM.msg).alias("totalValue"),
sendToSrc=AM.dst['value'],
sendToDst=None)
# `concat` the value for this vertex with its children values.
# We end up with an `array<int>` that we then pass to `udf_power_mean`.
new_vertices = agg.join(vertices, vertices.id == agg.id, "left")\
.select(vertices.id, \
'name', \
'value', \
concat(array(col('value')), 'totalValue').alias("allValues"))\
.withColumn('totalScore', udf_power_mean(col('allValues')))\
.drop('allValues')
new_vertices.show()
这会产生:
+---+----+-----+------------------+
| id|name|value| totalScore|
+---+----+-----+------------------+
| 1| foo| 20|16.697421658890875|
+---+----+-----+------------------+
有没有没有 UDF 的方法?只是普通的 spark 函数?
对于 Spark 2.4+,您可以使用 aggregate 函数:
简单版:
power_mean = lambda col: func.expr(f"""
aggregate(`{col}`, 0D, (acc,x) -> acc+power(x,2*size(`{col}`)), acc -> power(acc/size(`{col}`), 0.5/size(`{col}`)))
""")
上述解决方案的一个问题是,如果任何数组元素为 NULL,则结果 totalScore
将为 NULL,为避免这种情况,您可以这样做以下:
power_mean = lambda col: func.expr(f"""
aggregate(
/* expr: array column to iterate through */
`{col}`,
/* start: set zero value and the accumulator as an struct<psum:double,n:int> */
(0D as psum, size(filter(`{col}`, x -> x is not null)) as n),
/* merge: calculate `sum([(x)**p for x in values])` */
(acc,x) -> (acc.psum+power(coalesce(x,0),2*acc.n) as psum, acc.n as n),
/* finish: post processing */
acc -> power(acc.psum/acc.n, 0.5/acc.n)
)
""")
取决于你想如何设置 n
,以上将跳过 n
中的空值,
如果你想计算它们,只需将第二个参数从:
(0D as psum, size(filter(`{col}`, x -> x is not null)) as n),
到
(0D as psum, size(`{col}`) as n),
示例:
df = spark.createDataFrame([([20,5,None,10],)],['value'])
df.select("value", power_mean("value").alias('totalScore')).show(truncate=False)
+------------+------------------+
|value |totalScore |
+------------+------------------+
|[20, 5,, 10]|16.697421658984894|
+------------+------------------+
顺便说一句。如果你想 concat()
totalScore 即使其他列为 NULL 值,只需使用 coalesce()
函数,或者如果可能的话 concat_ws()
。