Pyspark 将 StructType 列聚合为每一行的元素数组
Pyspark aggregate a StructType column as an Array of its elements for each line
我正在尝试做一些看起来非常简单但不知何故不知道如何使用 pyspark 来完成的事情。
我有一个包含两列的 df(为了简化)'id' 和 'strcol',可能有重复的 ID
我想做一个 df.groupBy('id'),它将 return 为每个 id 一个 strcol 值的数组
简单的例子:
|--id--|--strCol--|
| a | {'a':1} |
| a | {'a':2} |
| b | {'b':3} |
| b | {'b':4} |
|------|----------|
would become
|--id--|-------aggsStr------|
| a | [{'a':1},{'a':2}] |
| b | [{'b':3},{'b':4}] |
|------|--------------------|
我尝试将 apply 与 pandas udf 一起使用,但它似乎拒绝 return 数组。 (或者我没有正确使用它)
您可以使用 pyspark.sql.functions
模块中的 collect_list
:
from pyspark.sql import functions as F
agg = df.groupby("id").agg(F.collect_list("strCol"))
一个功能齐全的例子:
import pandas as pd
from pyspark.sql import functions as F
data = {'id': ['a', 'a', 'b', 'b'], 'strCol': [{'a':1}, {'a':2}, {'b':3}, {'b':4}]}
df_aux = pd.DataFrame(data)
# df type: DataFrame[id: string, strCol: map<string,bigint>]
df = spark.createDataFrame(df_aux)
# agg type: # DataFrame[id: string, collect_list(strCol): array<map<string,bigint>>]
agg = df.groupby("id").agg(F.collect_list("strCol"))
希望对您有所帮助!
我正在尝试做一些看起来非常简单但不知何故不知道如何使用 pyspark 来完成的事情。
我有一个包含两列的 df(为了简化)'id' 和 'strcol',可能有重复的 ID
我想做一个 df.groupBy('id'),它将 return 为每个 id 一个 strcol 值的数组
简单的例子:
|--id--|--strCol--|
| a | {'a':1} |
| a | {'a':2} |
| b | {'b':3} |
| b | {'b':4} |
|------|----------|
would become
|--id--|-------aggsStr------|
| a | [{'a':1},{'a':2}] |
| b | [{'b':3},{'b':4}] |
|------|--------------------|
我尝试将 apply 与 pandas udf 一起使用,但它似乎拒绝 return 数组。 (或者我没有正确使用它)
您可以使用 pyspark.sql.functions
模块中的 collect_list
:
from pyspark.sql import functions as F
agg = df.groupby("id").agg(F.collect_list("strCol"))
一个功能齐全的例子:
import pandas as pd
from pyspark.sql import functions as F
data = {'id': ['a', 'a', 'b', 'b'], 'strCol': [{'a':1}, {'a':2}, {'b':3}, {'b':4}]}
df_aux = pd.DataFrame(data)
# df type: DataFrame[id: string, strCol: map<string,bigint>]
df = spark.createDataFrame(df_aux)
# agg type: # DataFrame[id: string, collect_list(strCol): array<map<string,bigint>>]
agg = df.groupby("id").agg(F.collect_list("strCol"))
希望对您有所帮助!