如何将不同的模式应用于单个数据集中的 csvs?
How to apply different schemas to csvs within a single dataset?
我从一个大的 csvs zip 文件开始,我在 Palantir Foundry 中解压了它。
我现在有一个由多个 csvs(每年一个)组成的数据集,其中 csvs 几乎是相同的模式,但有一些差异。如何将架构单独应用于每个 csvs 或规范化它们之间的架构?
如果你的文件被解压缩并且只是作为 .csv
s 在你的数据集中,你可以使用 Spark 的本地 spark_session.read.csv
方法类似于我对 .[=17= 的回答]
这将如下所示:
from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many
def read_files(spark_session, paths):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.format('csv').load(file_name)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs, how="wide")
return output_df
@transform(
the_output=Output("my.awesome.output"),
the_input=Input("my.awesome.input"),
)
def my_compute_function(the_input, the_output, ctx):
session = ctx.spark_session
input_filesystem = the_input.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
output_df = read_files(session, files)
the_output.write_dataframe(output_df)
请注意,union_many
动词会将您的模式堆叠在一起,因此如果您有许多 many 具有不同模式的文件,许多行将为空因为它们只会存在于一个文件中。
如果您知道每个模式的公共字段,并且知道只有一列会在文件之间更改名称,您可以更改逻辑以重命名 parsed_df
中的列以协调模式。这将取决于您希望对架构执行多少要求。
我还会包括一个与其他方法相同的测试方法,以便您可以快速验证正确的解析行为。
我从一个大的 csvs zip 文件开始,我在 Palantir Foundry 中解压了它。
我现在有一个由多个 csvs(每年一个)组成的数据集,其中 csvs 几乎是相同的模式,但有一些差异。如何将架构单独应用于每个 csvs 或规范化它们之间的架构?
如果你的文件被解压缩并且只是作为 这将如下所示: 请注意, 如果您知道每个模式的公共字段,并且知道只有一列会在文件之间更改名称,您可以更改逻辑以重命名 我还会包括一个与其他方法相同的测试方法.csv
s 在你的数据集中,你可以使用 Spark 的本地 spark_session.read.csv
方法类似于我对 from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many
def read_files(spark_session, paths):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.format('csv').load(file_name)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs, how="wide")
return output_df
@transform(
the_output=Output("my.awesome.output"),
the_input=Input("my.awesome.input"),
)
def my_compute_function(the_input, the_output, ctx):
session = ctx.spark_session
input_filesystem = the_input.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
output_df = read_files(session, files)
the_output.write_dataframe(output_df)
union_many
动词会将您的模式堆叠在一起,因此如果您有许多 many 具有不同模式的文件,许多行将为空因为它们只会存在于一个文件中。parsed_df
中的列以协调模式。这将取决于您希望对架构执行多少要求。