如何在我的 Foundry Magritte 数据集导出中获得漂亮的文件名和高效的存储使用?
How can I have nice file names & efficient storage usage in my Foundry Magritte dataset export?
我正在使用各种 Magritte 导出任务以 parquet 格式从 Foundry 数据集导出数据到 ABFS 系统(但 SFTP、S3、HDFS、和其他基于文件的导出)。
我导出的数据集比较小,不到 512 MB,这意味着它们不需要拆分成多个 parquet 文件,将所有数据放在一个文件中就足够了。我通过使用 .coalesce(1)
结束之前的转换来完成此操作,以将所有数据放在一个文件中。
问题是:
- 默认情况下,文件名为
part-0000-<rid>.snappy.parquet
,每个版本都有不同的 rid。这意味着,无论何时上传新文件,它都会出现在与其他文件相同的文件夹中,唯一的方法就是根据最后修改日期来判断哪个是最新版本。
- 每个版本的数据都存储在我的外部系统中,这占用了不必要的存储空间,除非我经常进去删除旧文件。
所有这些都是不必要的复杂性添加到我的下游系统,我只是想能够一步拉取最新版本的数据。
这可以通过重命名数据集中的单个 parquet 文件使其始终具有相同的文件名来实现,这样导出任务将覆盖外部系统中的先前文件。
这可以使用原始文件系统访问来完成。下面的 write_single_named_parquet_file
函数验证其输入,在输出数据集中创建一个具有给定名称的文件,然后将输入数据集中的文件复制到它。结果是包含单个命名镶木地板文件的无模式输出数据集。
备注
- 如果输入包含多个parquet文件,构建将失败,正如问题中指出的,在上游转换中需要调用
.coalesce(1)
(或.repartition(1)
)
- 如果您需要外部存储中的交易历史记录,或者您的数据集远大于 512 MB,则此方法不合适,因为仅保留最新版本,并且您可能希望在下游使用多个镶木地板文件系统。
createTransactionFolders
(将每个新导出文件放在不同的文件夹中)和 flagFile
(在所有文件都写入后创建一个标志文件)选项在这种情况下很有用。
- 转换不需要任何 spark 执行器,因此可以使用
@configure()
为其提供仅驱动程序配置文件。在处理较大的数据集时,为驱动程序提供额外的内存应该可以解决内存不足的错误。
使用shutil.copyfileobj
是因为打开的'files'其实只是文件对象
完整代码片段
example_transform.py
from transforms.api import transform, Input, Output
import .utils
@transform(
output=Output("/path/to/output"),
source_df=Input("/path/to/input"),
)
def compute(output, source_df):
return utils.write_single_named_parquet_file(output, source_df, "readable_file_name")
utils.py
from transforms.api import Input, Output
import shutil
import logging
log = logging.getLogger(__name__)
def write_single_named_parquet_file(output: Output, input: Input, file_name: str):
"""Write a single ".snappy.parquet" file with a given file name to a transforms output, containing the data of the
single ".snappy.parquet" file in the transforms input. This is useful when you need to export the data using
magritte, wanting a human readable name in the output, when not using separate transaction folders this should cause
the previous output to be automatically overwritten.
The input to this function must contain a single ".snappy.parquet" file, this can be achieved by calling
`.coalesce(1)` or `.repartition(1)` on your dataframe at the end of the upstream transform that produces the input.
This function should not be used for large dataframes (e.g. those greater than 512 mb in size), instead
transaction folders should be enabled in the export. This function can work for larger sizes, but you may find you
need additional driver memory to perform both the coalesce/repartition in the upstream transform, and here.
This produces a dataset without a schema, so features like expectations can't be used.
Parameters:
output (Output): The transforms output to write the single custom named ".snappy.parquet" file to, this is
the dataset you want to export
input (Input): The transforms input containing the data to be written to output, this must contain only one
".snappy.parquet" file (it can contain other files, for example logs)
file_name: The name of the file to be written, if the ".snappy.parquet" will be automatically appended if not
already there, and ".snappy" and ".parquet" will be corrected to ".snappy.parquet"
Raises:
RuntimeError: Input dataset must be coalesced or repartitioned into a single file.
RuntimeError: Input dataset file system cannot be empty.
Returns:
void: writes the response to output, no return value
"""
output.set_mode("replace") # Make sure it is snapshotting
input_files_df = input.filesystem().files() # Get all files
input_files = [row[0] for row in input_files_df.collect()] # noqa - first column in files_df is path
input_files = [f for f in input_files if f.endswith(".snappy.parquet")] # filter non parquet files
if len(input_files) > 1:
raise RuntimeError("Input dataset must be coalesced or repartitioned into a single file.")
if len(input_files) == 0:
raise RuntimeError("Input dataset file system cannot be empty.")
input_file_path = input_files[0]
log.info("Inital output file name: " + file_name)
# check for snappy.parquet and append if needed
if file_name.endswith(".snappy.parquet"):
pass # if it is already correct, do nothing
elif file_name.endswith(".parquet"):
# if it ends with ".parquet" (and not ".snappy.parquet"), remove parquet and append ".snappy.parquet"
file_name = file_name.removesuffix(".parquet") + ".snappy.parquet"
elif file_name.endswith(".snappy"):
# if it ends with just ".snappy" then append ".parquet"
file_name = file_name + ".parquet"
else:
# if doesn't end with any of the above, add ".snappy.parquet"
file_name = file_name + ".snappy.parquet"
log.info("Final output file name: " + file_name)
with input.filesystem().open(input_file_path, "rb") as in_f: # open the input file
with output.filesystem().open(file_name, "wb") as out_f: # open the output file
shutil.copyfileobj(in_f, out_f) # write the file into a new file
您还可以使用导出插件的rewritePaths 功能,在导出时将spark/*.snappy.parquet 文件下的文件重命名为“export.parquet”。这当然只有在只有一个文件的情况下才有效,所以转换中的 .coalesce(1)
是必须的:
excludePaths:
- ^_.*
- ^spark/_.*
rewritePaths:
'^spark/(.*[\/])(.*)': /export.parquet
uploadConfirmation: exportedFiles
incrementalType: snapshot
retriesPerFile: 0
bucketPolicy: BucketOwnerFullControl
directoryPath: features
setBucketPolicy: true
我运行进入相同的需求,唯一的区别是数据集由于大小需要分成多个部分。在这里发布代码以及我如何更新它来处理这个用例。
def rename_multiple_parquet_outputs(output: Output, input: list, file_name_prefix: str):
"""
Slight improvement to allow multiple output files to be renamed
"""
output.set_mode("replace") # Make sure it is snapshotting
input_files_df = input.filesystem().files() # Get all files
input_files = [row[0] for row in input_files_df.collect()] # noqa - first column in files_df is path
input_files = [f for f in input_files if f.endswith(".snappy.parquet")] # filter non parquet files
if len(input_files) == 0:
raise RuntimeError("Input dataset file system cannot be empty.")
input_file_path = input_files[0]
print(f'input files {input_files}')
print("prefix for target name: " + file_name_prefix)
for i,f in enumerate(input_files):
with input.filesystem().open(f, "rb") as in_f: # open the input file
with output.filesystem().open(f'{file_name_prefix}_part_{i}.snappy.parquet', "wb") as out_f: # open the output file
shutil.copyfileobj(in_f, out_f) # write the file into a new file
还要将其用于代码工作簿,需要保留输入并且可以检索输出参数,如下所示。
def rename_outputs(persisted_input):
output = Transforms.get_output()
rename_parquet_outputs(output, persisted_input, "prefix_for_renamed_files")
我正在使用各种 Magritte 导出任务以 parquet 格式从 Foundry 数据集导出数据到 ABFS 系统(但 SFTP、S3、HDFS、和其他基于文件的导出)。
我导出的数据集比较小,不到 512 MB,这意味着它们不需要拆分成多个 parquet 文件,将所有数据放在一个文件中就足够了。我通过使用 .coalesce(1)
结束之前的转换来完成此操作,以将所有数据放在一个文件中。
问题是:
- 默认情况下,文件名为
part-0000-<rid>.snappy.parquet
,每个版本都有不同的 rid。这意味着,无论何时上传新文件,它都会出现在与其他文件相同的文件夹中,唯一的方法就是根据最后修改日期来判断哪个是最新版本。 - 每个版本的数据都存储在我的外部系统中,这占用了不必要的存储空间,除非我经常进去删除旧文件。
所有这些都是不必要的复杂性添加到我的下游系统,我只是想能够一步拉取最新版本的数据。
这可以通过重命名数据集中的单个 parquet 文件使其始终具有相同的文件名来实现,这样导出任务将覆盖外部系统中的先前文件。
这可以使用原始文件系统访问来完成。下面的 write_single_named_parquet_file
函数验证其输入,在输出数据集中创建一个具有给定名称的文件,然后将输入数据集中的文件复制到它。结果是包含单个命名镶木地板文件的无模式输出数据集。
备注
- 如果输入包含多个parquet文件,构建将失败,正如问题中指出的,在上游转换中需要调用
.coalesce(1)
(或.repartition(1)
) - 如果您需要外部存储中的交易历史记录,或者您的数据集远大于 512 MB,则此方法不合适,因为仅保留最新版本,并且您可能希望在下游使用多个镶木地板文件系统。
createTransactionFolders
(将每个新导出文件放在不同的文件夹中)和flagFile
(在所有文件都写入后创建一个标志文件)选项在这种情况下很有用。 - 转换不需要任何 spark 执行器,因此可以使用
@configure()
为其提供仅驱动程序配置文件。在处理较大的数据集时,为驱动程序提供额外的内存应该可以解决内存不足的错误。
使用 shutil.copyfileobj
是因为打开的'files'其实只是文件对象
完整代码片段
example_transform.py
from transforms.api import transform, Input, Output
import .utils
@transform(
output=Output("/path/to/output"),
source_df=Input("/path/to/input"),
)
def compute(output, source_df):
return utils.write_single_named_parquet_file(output, source_df, "readable_file_name")
utils.py
from transforms.api import Input, Output
import shutil
import logging
log = logging.getLogger(__name__)
def write_single_named_parquet_file(output: Output, input: Input, file_name: str):
"""Write a single ".snappy.parquet" file with a given file name to a transforms output, containing the data of the
single ".snappy.parquet" file in the transforms input. This is useful when you need to export the data using
magritte, wanting a human readable name in the output, when not using separate transaction folders this should cause
the previous output to be automatically overwritten.
The input to this function must contain a single ".snappy.parquet" file, this can be achieved by calling
`.coalesce(1)` or `.repartition(1)` on your dataframe at the end of the upstream transform that produces the input.
This function should not be used for large dataframes (e.g. those greater than 512 mb in size), instead
transaction folders should be enabled in the export. This function can work for larger sizes, but you may find you
need additional driver memory to perform both the coalesce/repartition in the upstream transform, and here.
This produces a dataset without a schema, so features like expectations can't be used.
Parameters:
output (Output): The transforms output to write the single custom named ".snappy.parquet" file to, this is
the dataset you want to export
input (Input): The transforms input containing the data to be written to output, this must contain only one
".snappy.parquet" file (it can contain other files, for example logs)
file_name: The name of the file to be written, if the ".snappy.parquet" will be automatically appended if not
already there, and ".snappy" and ".parquet" will be corrected to ".snappy.parquet"
Raises:
RuntimeError: Input dataset must be coalesced or repartitioned into a single file.
RuntimeError: Input dataset file system cannot be empty.
Returns:
void: writes the response to output, no return value
"""
output.set_mode("replace") # Make sure it is snapshotting
input_files_df = input.filesystem().files() # Get all files
input_files = [row[0] for row in input_files_df.collect()] # noqa - first column in files_df is path
input_files = [f for f in input_files if f.endswith(".snappy.parquet")] # filter non parquet files
if len(input_files) > 1:
raise RuntimeError("Input dataset must be coalesced or repartitioned into a single file.")
if len(input_files) == 0:
raise RuntimeError("Input dataset file system cannot be empty.")
input_file_path = input_files[0]
log.info("Inital output file name: " + file_name)
# check for snappy.parquet and append if needed
if file_name.endswith(".snappy.parquet"):
pass # if it is already correct, do nothing
elif file_name.endswith(".parquet"):
# if it ends with ".parquet" (and not ".snappy.parquet"), remove parquet and append ".snappy.parquet"
file_name = file_name.removesuffix(".parquet") + ".snappy.parquet"
elif file_name.endswith(".snappy"):
# if it ends with just ".snappy" then append ".parquet"
file_name = file_name + ".parquet"
else:
# if doesn't end with any of the above, add ".snappy.parquet"
file_name = file_name + ".snappy.parquet"
log.info("Final output file name: " + file_name)
with input.filesystem().open(input_file_path, "rb") as in_f: # open the input file
with output.filesystem().open(file_name, "wb") as out_f: # open the output file
shutil.copyfileobj(in_f, out_f) # write the file into a new file
您还可以使用导出插件的rewritePaths 功能,在导出时将spark/*.snappy.parquet 文件下的文件重命名为“export.parquet”。这当然只有在只有一个文件的情况下才有效,所以转换中的 .coalesce(1)
是必须的:
excludePaths:
- ^_.*
- ^spark/_.*
rewritePaths:
'^spark/(.*[\/])(.*)': /export.parquet
uploadConfirmation: exportedFiles
incrementalType: snapshot
retriesPerFile: 0
bucketPolicy: BucketOwnerFullControl
directoryPath: features
setBucketPolicy: true
我运行进入相同的需求,唯一的区别是数据集由于大小需要分成多个部分。在这里发布代码以及我如何更新它来处理这个用例。
def rename_multiple_parquet_outputs(output: Output, input: list, file_name_prefix: str):
"""
Slight improvement to allow multiple output files to be renamed
"""
output.set_mode("replace") # Make sure it is snapshotting
input_files_df = input.filesystem().files() # Get all files
input_files = [row[0] for row in input_files_df.collect()] # noqa - first column in files_df is path
input_files = [f for f in input_files if f.endswith(".snappy.parquet")] # filter non parquet files
if len(input_files) == 0:
raise RuntimeError("Input dataset file system cannot be empty.")
input_file_path = input_files[0]
print(f'input files {input_files}')
print("prefix for target name: " + file_name_prefix)
for i,f in enumerate(input_files):
with input.filesystem().open(f, "rb") as in_f: # open the input file
with output.filesystem().open(f'{file_name_prefix}_part_{i}.snappy.parquet', "wb") as out_f: # open the output file
shutil.copyfileobj(in_f, out_f) # write the file into a new file
还要将其用于代码工作簿,需要保留输入并且可以检索输出参数,如下所示。
def rename_outputs(persisted_input):
output = Transforms.get_output()
rename_parquet_outputs(output, persisted_input, "prefix_for_renamed_files")