PySpark MapType 从列值到列名数组

PySpark MapType from column values to array of column name

我有 roles 的数据框和 ids 扮演这些角色的人。在下面的table中,角色是a,b,c,d,人物是a3,36,79,38

我想要的是一张人与角色数组的映射,如 table 右侧所示。

+---+----+----+---+---+--------+
|rec|   a|   b|  c|  d|    ppl |  pplmap
+---+----+----+---+---+--------+-------------------------------------
|  D|  a3|  36| 36| 36|[a3, 36]| [ a3 -> ['a'], 36 -> ['b','c','d'] ]  
|  E|  a3|  79| 79| a3|[a3, 79]| [ a3 -> ['a','d'], 79 -> ['b','c'] ]
|  F|null|null| 38| 38|    [38]| [ 38 -> ['c','d'] ]
+---+----+----+---+---+--------+

实际上,我真正想要的是一份易读的报告,例如:

D
  a3 roles: a
  36 roles: b, c, d
E
  a3 roles: a, d
  79 roles: b, c
F
  38 roles: c, d

我正在使用 PySpark 3。

有什么建议吗?谢谢!!

设置:

import pandas as pd     
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

df = pd.DataFrame({
    'rec': list('DEF'),
    'a': ['a3', 'a3', None],
    'b': [36, 79, None],
    'c': [36, 79, 38],
    'd': [36, 55, 38]
})
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(df) 

然后相应的DataFrame,按值分组并按键聚合:

cols_to_melt = list('abcd')
res = df.withColumn(
    "tmp",
    explode(array(
        [struct(lit(c).alias('key'), col(c).alias('val'))
        for c in cols_to_melt]))) \
    .select('rec', col('tmp.key'), col('tmp.val')) \
    .dropna() \
    .groupby(['rec', 'val']) \
    .agg(collect_list('key').alias('keys')) \
    .groupby('rec') \
    .agg(map_from_entries(collect_list(struct("val","keys"))).alias('maps'))
res.show(truncate=False)

输出:

+---+----------------------------------------------+
|rec|maps                                          |
+---+----------------------------------------------+
|F  |{38 -> [c, d], NaN -> [b]}                    |
|E  |{79 -> [c], 79.0 -> [b], a3 -> [a], 55 -> [d]}|
|D  |{36.0 -> [b], a3 -> [a], 36 -> [c, d]}        |
+---+----------------------------------------------+

要获取报告,您只需遍历收集的数据:

for row in res.collect():
  print(row.rec)
  print('\n'.join(f"  {k} roles: {', '.join(v)}" for k, v in row.maps.items()))

那么您的最终报告应该如下所示:

F
  38 roles: c, d
  NaN roles: b
E
  55 roles: d
  79 roles: c
  a3 roles: a
  79.0 roles: b
D
  36.0 roles: b
  a3 roles: a
  36 roles: c, d

我在这里没有处理的一个问题是您的其中一列同时包含数字和字符串值,这在 spark 中是不可能的。

如果您要将 pandas DataFrame 转换为 spark DataFrame(就像我在示例中所做的那样),您应该传递一个明确的 .

如果您正在从 CSV 文件中读取数据,您可能不必这样做 - 类型将自动推断为 String

但是,在这种情况下,为了对某些具有 38 和其他 "38" 值的列进行分组,您应该确保所有相关的数字列也都转换为 String.

因此,在任何情况下,最好使用模式来确保您在 DataFrame 中准确获得所需的类型。

您可以首先对数据框进行逆透视,然后使用一些 groupby 构建您想要的地图列。

输入数据帧:

data = [
    ("D", "a3", "36", "36", "36", ["a3", "36"]),
    ("E", "a3", "79", "79", "a3", ["a3", "79"]),
    ("F", None, None, "38", "38", ["38"]),
]

df = spark.createDataFrame(data, ["id", "a", "b", "c", "d", "ppl"])

分组后使用stack函数反轴和map_from_entries:

import pyspark.sql.functions as F

df1 = df.selectExpr(
    "id",
    "stack(4, 'a', a, 'b', b, 'c', c, 'd', d) as (role, person)"
).filter(
    "person is not null"
).groupBy("id", "person").agg(
    F.collect_list("role").alias("roles")
).groupBy("id").agg(
    F.map_from_entries(
        F.collect_list(F.struct(F.col("person"), F.col("roles")))
    ).alias("pplmap")
)

df1.show(truncate=False)
#+---+----------------------------+
#|id |pplmap                      |
#+---+----------------------------+
#|F  |{38 -> [c, d]}              |
#|E  |{79 -> [b, c], a3 -> [a, d]}|
#|D  |{a3 -> [a], 36 -> [b, c, d]}|
#+---+----------------------------+

如果你想动态生成栈表达式(如果你有很多角色列),你可以看我的另一个答案