pyspark - 使用 ArrayType 列进行折叠和求和
pyspark - fold and sum with ArrayType column
我正在尝试按元素求和,并且我已经创建了这个虚拟 df。输出应该是 [10,4,4,1]
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType
data = [
("James",[1,1,1,1]),
("James",[2,1,1,0]),
("James",[3,1,1,0]),
("James",[4,1,1,0])
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("scores", ArrayType(IntegerType()), True) \
])
df = spark.createDataFrame(data=data,schema=schema)
posexplode 有效,但我的真实 df 太大,所以我尝试使用 fold,但它给了我一个错误。有任何想法吗?谢谢!
vec_df = df.select("scores")
vec_sums = vec_df.rdd.fold([0]*4, lambda a,b: [x + y for x, y in zip(a, b)])
File "<ipython-input-115-9b470dedcfef>", line 2, in <listcomp>
TypeError: unsupported operand type(s) for +: 'int' and 'list'
您需要在 fold
之前将行的 RDD 映射到列表的 RDD:
vec_sums = vec_df.rdd.map(lambda x: x[0]).fold([0]*4, lambda a,b: [x + y for x, y in zip(a, b)])
为了帮助理解,您可以看看 RDD 的样子。
>>> vec_df.rdd.collect()
[Row(scores=[1, 1, 1, 1]), Row(scores=[2, 1, 1, 0]), Row(scores=[3, 1, 1, 0]), Row(scores=[4, 1, 1, 0])]
>>> vec_df.rdd.map(lambda x: x[0]).collect()
[[1, 1, 1, 1], [2, 1, 1, 0], [3, 1, 1, 0], [4, 1, 1, 0]]
所以你可以想象vec_df.rdd
包含一个嵌套列表,需要在fold
之前取消嵌套。
我正在尝试按元素求和,并且我已经创建了这个虚拟 df。输出应该是 [10,4,4,1]
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType
data = [
("James",[1,1,1,1]),
("James",[2,1,1,0]),
("James",[3,1,1,0]),
("James",[4,1,1,0])
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("scores", ArrayType(IntegerType()), True) \
])
df = spark.createDataFrame(data=data,schema=schema)
posexplode 有效,但我的真实 df 太大,所以我尝试使用 fold,但它给了我一个错误。有任何想法吗?谢谢!
vec_df = df.select("scores")
vec_sums = vec_df.rdd.fold([0]*4, lambda a,b: [x + y for x, y in zip(a, b)])
File "<ipython-input-115-9b470dedcfef>", line 2, in <listcomp>
TypeError: unsupported operand type(s) for +: 'int' and 'list'
您需要在 fold
之前将行的 RDD 映射到列表的 RDD:
vec_sums = vec_df.rdd.map(lambda x: x[0]).fold([0]*4, lambda a,b: [x + y for x, y in zip(a, b)])
为了帮助理解,您可以看看 RDD 的样子。
>>> vec_df.rdd.collect()
[Row(scores=[1, 1, 1, 1]), Row(scores=[2, 1, 1, 0]), Row(scores=[3, 1, 1, 0]), Row(scores=[4, 1, 1, 0])]
>>> vec_df.rdd.map(lambda x: x[0]).collect()
[[1, 1, 1, 1], [2, 1, 1, 0], [3, 1, 1, 0], [4, 1, 1, 0]]
所以你可以想象vec_df.rdd
包含一个嵌套列表,需要在fold
之前取消嵌套。