reduce 方法是否需要初始化程序?

Is reduce method requiring initializer?

我正在尝试将任意数量的 PySpark 数据帧附加在一起。 这是尝试使用下面的 union_all 函数:

from functools import reduce
from pyspark.sql import DataFrame


def union_all(*dfs):
    return reduce(DataFrame.union, dfs)

计算时,returns一个TypeError:

TypeError: reduce() of empty sequence with no initial value

以下线程涵盖相同的 TypeError 但针对不同的情况(整数范围上的 lambda 函数):

how to fix reduce() of empty sequence with no initial value error?

根据该讨论,解决方案是为 reduce 函数提供初始化程序。 在我的例子中,这可能是一个 PySpark 数据框,所以我使用我的数据框列表中的第一个元素:

def union_all(*dfs):
    return reduce(DataFrame.union, dfs, dfs[0])

调用上述函数会抛出 IndexError:

IndexError: tuple index out of range

这是什么意思,如何解决?

我正在使用的数据(dfs): dfs中的每个元素都是一个dataframe,相同的列以相同的顺序排列(总共有3个dataframe)。这是其中一个数据框作为示例:

DataFrame[id: bigint, index: int, sn: string, sid: string, dt: string, ps: string, fr: string, hr: 
 string, pn: string, aid: string, mf: string, mn: string]

您似乎将空序列作为 dfs 参数传递给 union_all 函数。初始值作为减少操作的第一个元素包含在序列中。 https://docs.python.org/3/library/functools.html#functools.reduce

If the optional initializer is present, it is placed before the items of the iterable in the calculation, and serves as a default when the iterable is empty.

因此,当您提供空序列时,如果您不想出现任何错误,则必须指定初始化程序,因此在这种情况下它对应于一个空数据帧。但是,这是没有意义的,因为您没有架构信息来创建适当的数据框。因此,最好验证给定参数是否具有值并检查给定数据帧是否具有相同的联合操作模式。例子;

from functools import reduce

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql import DataFrame

sparkSession = SparkSession.builder.getOrCreate()

# example schema
struct = StructType()
struct.add("a", "integer")
struct.add("b", "integer")
struct.add("c", "integer")

# example dataframes
df_1 = sparkSession.createDataFrame(data=[(1, 2, 3), ], schema=struct)
df_2 = sparkSession.createDataFrame(data=[(10, 20, 30), ], schema=struct)
df_3 = sparkSession.createDataFrame(data=[(100, 200, 300), ], schema=struct)


def union_dfs(*dfs):
    if not dfs:  # check dfs tuple is non-empty
        raise ValueError("At least one dataframe must be provided")

    schemas = [df.schema for df in dfs]
    if len(set(schemas)) != 1:  # validate all dfs have the same schema
        raise ValueError("Each of df's schema must be the same")

    result_df = reduce(DataFrame.union, dfs)

    return result_df

union_df = union_dfs(df_1, df_2, df_3)

union_df.show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
| 10| 20| 30|
|100|200|300|
+---+---+---+