如何将不同的模式应用于单个数据集中的 csvs?

How to apply different schemas to csvs within a single dataset?

我从一个大的 csvs zip 文件开始,我在 Palantir Foundry 中解压了它。

我现在有一个由多个 csvs(每年一个)组成的数据集,其中 csvs 几乎是相同的模式,但有一些差异。如何将架构单独应用于每个 csvs 或规范化它们之间的架构?

如果你的文件被解压缩并且只是作为 .csvs 在你的数据集中,你可以使用 Spark 的本地 spark_session.read.csv 方法类似于我对 .[=17= 的回答]

这将如下所示:

from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many


def read_files(spark_session, paths):
    parsed_dfs = []
    for file_name in paths:
        parsed_df = spark_session.read.format('csv').load(file_name)
        parsed_dfs += [parsed_df]
    output_df = union_many(*parsed_dfs, how="wide")
    return output_df


@transform(
    the_output=Output("my.awesome.output"),
    the_input=Input("my.awesome.input"),
)
def my_compute_function(the_input, the_output, ctx):
    session = ctx.spark_session
    input_filesystem = the_input.filesystem()
    hadoop_path = input_filesystem.hadoop_path
    files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
    output_df = read_files(session, files)
    the_output.write_dataframe(output_df)

请注意,union_many 动词会将您的模式堆叠在一起,因此如果您有许多 many 具有不同模式的文件,许多行将为空因为它们只会存在于一个文件中。

如果您知道每个模式的公共字段,并且知道只有一列会在文件之间更改名称,您可以更改逻辑以重命名 parsed_df 中的列以协调模式。这将取决于您希望对架构执行多少要求。

我还会包括一个与其他方法相同的测试方法,以便您可以快速验证正确的解析行为。