在 PySpark 数据框中添加列总和作为新列
Add column sum as new column in PySpark dataframe
我正在使用 PySpark 并且我有一个带有一堆数字列的 Spark 数据框。我想添加一列,它是所有其他列的总和。
假设我的数据框有 "a"、"b" 和 "c" 列。我知道我可以做到:
df.withColumn('total_col', df.a + df.b + df.c)
问题是我不想单独输入每一列并添加它们,尤其是当我有很多列时。我希望能够自动或通过指定要添加的列名列表来执行此操作。还有其他方法吗?
这并不明显。我看不到 spark Dataframes API.
中定义的基于行的列总和
版本 2
这可以通过一种相当简单的方式完成:
newdf = df.withColumn('total', sum(df[col] for col in df.columns))
df.columns
由 pyspark 提供,作为字符串列表提供 Spark Dataframe 中的所有列名称。对于不同的总和,您可以提供任何其他列名列表。
我没有尝试将此作为我的第一个解决方案,因为我不确定它的行为方式。但它有效。
版本 1
这太复杂了,但效果也不错。
你可以这样做:
- 使用
df.columns
获取列名列表
- 使用该名称列表来制作列列表
- 将该列表传递给将在 fold-type functional manner
中调用该列的重载添加函数的对象
用 python 的 reduce, some knowledge of how operator overloading works, and the pyspark code for columns here 变成:
def column_add(a,b):
return a.__add__(b)
newdf = df.withColumn('total_col',
reduce(column_add, ( df[col] for col in df.columns ) ))
请注意,这是一个 python reduce,而不是 spark RDD reduce,并且要 reduce 的第二个参数中的括号项需要括号,因为它是一个列表生成器表达式。
已测试,有效!
$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
... return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]
我的问题与上述类似(有点复杂),因为我必须添加 连续 列总和作为 PySpark 数据框中的新列。此方法使用上面 Paul 的版本 1 中的代码:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('addColAsCumulativeSUM').getOrCreate()
df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)\
,(6,1,-4),(0,2,-2),(6,4,1)\
,(4,5,2),(5,-3,-5),(6,4,-1)]\
,schema=['x1','x2','x3'])
df.show()
+---+---+---+
| x1| x2| x3|
+---+---+---+
| 1| 2| 3|
| 4| 5| 6|
| 3| 2| 1|
| 6| 1| -4|
| 0| 2| -2|
| 6| 4| 1|
| 4| 5| 2|
| 5| -3| -5|
| 6| 4| -1|
+---+---+---+
colnames=df.columns
添加累加和(连续)的新列:
for i in range(0,len(colnames)):
colnameLst= colnames[0:i+1]
colname = 'cm'+ str(i+1)
df = df.withColumn(colname, sum(df[col] for col in colnameLst))
df.show()
+---+---+---+---+---+---+
| x1| x2| x3|cm1|cm2|cm3|
+---+---+---+---+---+---+
| 1| 2| 3| 1| 3| 6|
| 4| 5| 6| 4| 9| 15|
| 3| 2| 1| 3| 5| 6|
| 6| 1| -4| 6| 7| 3|
| 0| 2| -2| 0| 2| 0|
| 6| 4| 1| 6| 10| 11|
| 4| 5| 2| 4| 9| 11|
| 5| -3| -5| 5| 2| -3|
| 6| 4| -1| 6| 10| 9|
+---+---+---+---+---+---+
'cumulative sum'添加的栏目如下:
cm1 = x1
cm2 = x1 + x2
cm3 = x1 + x2 + x3
解决方案
newdf = df.withColumn('total', sum(df[col] for col in df.columns))
@Paul 发表作品。尽管如此,我还是遇到了错误,正如我所看到的那样,
TypeError: 'Column' object is not callable
一段时间后我发现了问题(至少在我的情况下)。问题是我之前使用
行导入了一些 pyspark 函数
from pyspark.sql.functions import udf, col, count, sum, when, avg, mean, min
所以该行导入了 sum
pyspark 命令,而 df.withColumn('total', sum(df[col] for col in df.columns))
应该使用正常的 python sum
函数。
您可以使用 del sum
删除 pyspark 函数的引用。
否则在我的情况下,我将导入更改为
import pyspark.sql.functions as F
然后将函数引用为 F.sum
。
最直接的方法是使用 expr
函数
from pyspark.sql.functions import *
data = data.withColumn('total', expr("col1 + col2 + col3 + col4"))
df = spark.createDataFrame([("linha1", "valor1", 2), ("linha2", "valor2", 5)], ("Columna1", "Columna2", "Columna3"))
df.show()
+--------+--------+--------+
|Columna1|Columna2|Columna3|
+--------+--------+--------+
| linha1| valor1| 2|
| linha2| valor2| 5|
+--------+--------+--------+
df = df.withColumn('DivisaoPorDois', df[2]/2)
df.show()
+--------+--------+--------+--------------+
|Columna1|Columna2|Columna3|DivisaoPorDois|
+--------+--------+--------+--------------+
| linha1| valor1| 2| 1.0|
| linha2| valor2| 5| 2.5|
+--------+--------+--------+--------------+
df = df.withColumn('Soma_Colunas', df[2]+df[3])
df.show()
+--------+--------+--------+--------------+------------+
|Columna1|Columna2|Columna3|DivisaoPorDois|Soma_Colunas|
+--------+--------+--------+--------------+------------+
| linha1| valor1| 2| 1.0| 3.0|
| linha2| valor2| 5| 2.5| 7.5|
+--------+--------+--------+--------------+------------+
一个非常简单的方法是只使用 select 而不是 withcolumn 如下:
df = df.select('*', (col("a")+col("b")+col('c).alias("total"))
这应该会为您提供所需的金额,并根据要求进行细微更改
将列表中的多列合计为一列
PySpark 的 sum
函数不支持列添加。
这可以使用 expr
函数来实现。
from pyspark.sql.functions import expr
cols_list = ['a', 'b', 'c']
# Creating an addition expression using `join`
expression = '+'.join(cols_list)
df = df.withColumn('sum_cols', expr(expression))
这为我们提供了所需的列总和。
以下方法对我有用:
- 导入 pyspark sql 函数
from pyspark.sql 将函数导入为 F
- 使用F.expr(list_of_columns)
data_frame.withColumn('Total_Sum',F.expr('col_name1+col_name2+..col_namen)
我正在使用 PySpark 并且我有一个带有一堆数字列的 Spark 数据框。我想添加一列,它是所有其他列的总和。
假设我的数据框有 "a"、"b" 和 "c" 列。我知道我可以做到:
df.withColumn('total_col', df.a + df.b + df.c)
问题是我不想单独输入每一列并添加它们,尤其是当我有很多列时。我希望能够自动或通过指定要添加的列名列表来执行此操作。还有其他方法吗?
这并不明显。我看不到 spark Dataframes API.
中定义的基于行的列总和版本 2
这可以通过一种相当简单的方式完成:
newdf = df.withColumn('total', sum(df[col] for col in df.columns))
df.columns
由 pyspark 提供,作为字符串列表提供 Spark Dataframe 中的所有列名称。对于不同的总和,您可以提供任何其他列名列表。
我没有尝试将此作为我的第一个解决方案,因为我不确定它的行为方式。但它有效。
版本 1
这太复杂了,但效果也不错。
你可以这样做:
- 使用
df.columns
获取列名列表 - 使用该名称列表来制作列列表
- 将该列表传递给将在 fold-type functional manner 中调用该列的重载添加函数的对象
用 python 的 reduce, some knowledge of how operator overloading works, and the pyspark code for columns here 变成:
def column_add(a,b):
return a.__add__(b)
newdf = df.withColumn('total_col',
reduce(column_add, ( df[col] for col in df.columns ) ))
请注意,这是一个 python reduce,而不是 spark RDD reduce,并且要 reduce 的第二个参数中的括号项需要括号,因为它是一个列表生成器表达式。
已测试,有效!
$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
... return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]
我的问题与上述类似(有点复杂),因为我必须添加 连续 列总和作为 PySpark 数据框中的新列。此方法使用上面 Paul 的版本 1 中的代码:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('addColAsCumulativeSUM').getOrCreate()
df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)\
,(6,1,-4),(0,2,-2),(6,4,1)\
,(4,5,2),(5,-3,-5),(6,4,-1)]\
,schema=['x1','x2','x3'])
df.show()
+---+---+---+
| x1| x2| x3|
+---+---+---+
| 1| 2| 3|
| 4| 5| 6|
| 3| 2| 1|
| 6| 1| -4|
| 0| 2| -2|
| 6| 4| 1|
| 4| 5| 2|
| 5| -3| -5|
| 6| 4| -1|
+---+---+---+
colnames=df.columns
添加累加和(连续)的新列:
for i in range(0,len(colnames)):
colnameLst= colnames[0:i+1]
colname = 'cm'+ str(i+1)
df = df.withColumn(colname, sum(df[col] for col in colnameLst))
df.show()
+---+---+---+---+---+---+
| x1| x2| x3|cm1|cm2|cm3|
+---+---+---+---+---+---+
| 1| 2| 3| 1| 3| 6|
| 4| 5| 6| 4| 9| 15|
| 3| 2| 1| 3| 5| 6|
| 6| 1| -4| 6| 7| 3|
| 0| 2| -2| 0| 2| 0|
| 6| 4| 1| 6| 10| 11|
| 4| 5| 2| 4| 9| 11|
| 5| -3| -5| 5| 2| -3|
| 6| 4| -1| 6| 10| 9|
+---+---+---+---+---+---+
'cumulative sum'添加的栏目如下:
cm1 = x1
cm2 = x1 + x2
cm3 = x1 + x2 + x3
解决方案
newdf = df.withColumn('total', sum(df[col] for col in df.columns))
@Paul 发表作品。尽管如此,我还是遇到了错误,正如我所看到的那样,
TypeError: 'Column' object is not callable
一段时间后我发现了问题(至少在我的情况下)。问题是我之前使用
行导入了一些 pyspark 函数from pyspark.sql.functions import udf, col, count, sum, when, avg, mean, min
所以该行导入了 sum
pyspark 命令,而 df.withColumn('total', sum(df[col] for col in df.columns))
应该使用正常的 python sum
函数。
您可以使用 del sum
删除 pyspark 函数的引用。
否则在我的情况下,我将导入更改为
import pyspark.sql.functions as F
然后将函数引用为 F.sum
。
最直接的方法是使用 expr
函数
from pyspark.sql.functions import *
data = data.withColumn('total', expr("col1 + col2 + col3 + col4"))
df = spark.createDataFrame([("linha1", "valor1", 2), ("linha2", "valor2", 5)], ("Columna1", "Columna2", "Columna3"))
df.show()
+--------+--------+--------+
|Columna1|Columna2|Columna3|
+--------+--------+--------+
| linha1| valor1| 2|
| linha2| valor2| 5|
+--------+--------+--------+
df = df.withColumn('DivisaoPorDois', df[2]/2)
df.show()
+--------+--------+--------+--------------+
|Columna1|Columna2|Columna3|DivisaoPorDois|
+--------+--------+--------+--------------+
| linha1| valor1| 2| 1.0|
| linha2| valor2| 5| 2.5|
+--------+--------+--------+--------------+
df = df.withColumn('Soma_Colunas', df[2]+df[3])
df.show()
+--------+--------+--------+--------------+------------+
|Columna1|Columna2|Columna3|DivisaoPorDois|Soma_Colunas|
+--------+--------+--------+--------------+------------+
| linha1| valor1| 2| 1.0| 3.0|
| linha2| valor2| 5| 2.5| 7.5|
+--------+--------+--------+--------------+------------+
一个非常简单的方法是只使用 select 而不是 withcolumn 如下:
df = df.select('*', (col("a")+col("b")+col('c).alias("total"))
这应该会为您提供所需的金额,并根据要求进行细微更改
将列表中的多列合计为一列
PySpark 的 sum
函数不支持列添加。
这可以使用 expr
函数来实现。
from pyspark.sql.functions import expr
cols_list = ['a', 'b', 'c']
# Creating an addition expression using `join`
expression = '+'.join(cols_list)
df = df.withColumn('sum_cols', expr(expression))
这为我们提供了所需的列总和。
以下方法对我有用:
- 导入 pyspark sql 函数
from pyspark.sql 将函数导入为 F - 使用F.expr(list_of_columns)
data_frame.withColumn('Total_Sum',F.expr('col_name1+col_name2+..col_namen)