如何使用字典正确使用 reduce

How to properly use reduce with a dictionary

我正在使用自定义函数作为缩减操作的一部分。对于以下示例,我收到以下消息 TypeError: reduce() takes no keyword arguments - 我相信这是由于我在函数 exposed_colum 中使用字典 mapping 的方式所致 - 你能帮我解决这个问题吗功能?

from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from functools import reduce


def process_data(df: DataFrame):
    col_mapping = dict(zip(["name", "age"], ["a", "b"]))

    # Do other things...

    def exposed_column(df: DataFrame, mapping: dict):
        return df.select([col(c).alias(mapping.get(c, c)) for c in df.columns])

    return reduce(exposed_column, sequence=col_mapping, initial=df)


spark = SparkSession.builder.appName("app").getOrCreate()
l = [
    ("Bob", 25, "Spain"),
    ("Marc", 22, "France"),
    ("Steve", 20, "Belgium"),
    ("Donald", 26, "USA"),
]
rdd = spark.sparkContext.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]), country=x[2])).toDF()

people.show()
process_data(people).show()

people.show() 看起来像这样

+---+-------+------+
|age|country|  name|
+---+-------+------+
| 25|  Spain|   Bob|
| 22| France|  Marc|
| 20|Belgium| Steve|
| 26|    USA|Donald|
+---+-------+------+

这是预期的输出

+------+---+
|     a|  b|
+------+---+
|   Bob| 25|
|  Marc| 22|
| Steve| 20|
|Donald| 26|
+------+---+

reduce不带关键词,没错。 删除关键字后,您会注意到一个更严重的问题:当您遍历字典时,您只是在遍历它的键。因此,您尝试在其中批量重命名列的函数不会按照您的想法执行。

批量重命名列的一种方法是遍历字典的 items:

from typing import Mapping
from pyspark.sql import DataFrame

def rename_columns(frame: DataFrame, mapping: Mapping[str, str]) -> DataFrame:
    return reduce(lambda f, old_new: f.withColumnRenamed(old_new[0], old_new[1]),
                  mapping.items(), frame)

这允许您传入将列名映射到其他名称的字典(请注意 the recommendation 用于向参数添加类型提示是使用 Mapping,而不是 dict) .幸运的是,如果您尝试重命名不在 DataFrame 中的列,withColumnRenamed 不会抱怨,因此这等同于您的 mapping.get(c, c).

我在您的代码中没有注意到的一件事是它删除了 country 列。所以这仍然会在你的输出中。