如何动态传递 save_args 到 kedro 目录?
How to dynamically pass save_args to kedro catalog?
我正在尝试在 Kedro 中编写增量表。将文件格式更改为 delta 使得写入为 delta 表,模式为覆盖。
之前,原始层 (meta_reload) 中的一个节点创建了一个数据集,用于确定每个数据集增量加载的开始日期。每个节点使用该原始数据集来过滤工作数据集以应用转换逻辑并增量写入分区镶木地板表。
但是现在用 mode as overwrite 写入 delta,只需将文件类型更改为 delta 即可使当前增量数据覆盖所有过去的数据,而不仅仅是那些分区。所以我需要在目录中的 save_args 中使用 replaceWhere 选项。
当我需要读取 meta_reload 原始数据集以确定日期时,如何确定目录中 replaceWhere 的开始日期。
有没有办法从节点内部动态传递 save_args?
my_dataset:
type: my_project.io.pyspark.SparkDataSet
filepath: "s3://${bucket_de_pipeline}/${data_environment_project}/${data_environment_intermediate}/my_dataset/"
file_format: delta
layer: intermediate
save_args:
mode: "overwrite"
replaceWhere: "DATE_ID > xyz" ## what I want to implement dynamically
partitionBy: [ "DATE_ID" ]
我已经在 GH discussion 上回答了这个问题。简而言之,您需要子类化并定义您自己的 SparkDataSet
我们避免在 Kedro 级别更改数据集的基础 API,但我们鼓励您根据自己的目的更改和重新混合它。
我正在尝试在 Kedro 中编写增量表。将文件格式更改为 delta 使得写入为 delta 表,模式为覆盖。
之前,原始层 (meta_reload) 中的一个节点创建了一个数据集,用于确定每个数据集增量加载的开始日期。每个节点使用该原始数据集来过滤工作数据集以应用转换逻辑并增量写入分区镶木地板表。
但是现在用 mode as overwrite 写入 delta,只需将文件类型更改为 delta 即可使当前增量数据覆盖所有过去的数据,而不仅仅是那些分区。所以我需要在目录中的 save_args 中使用 replaceWhere 选项。 当我需要读取 meta_reload 原始数据集以确定日期时,如何确定目录中 replaceWhere 的开始日期。 有没有办法从节点内部动态传递 save_args?
my_dataset:
type: my_project.io.pyspark.SparkDataSet
filepath: "s3://${bucket_de_pipeline}/${data_environment_project}/${data_environment_intermediate}/my_dataset/"
file_format: delta
layer: intermediate
save_args:
mode: "overwrite"
replaceWhere: "DATE_ID > xyz" ## what I want to implement dynamically
partitionBy: [ "DATE_ID" ]
我已经在 GH discussion 上回答了这个问题。简而言之,您需要子类化并定义您自己的 SparkDataSet
我们避免在 Kedro 级别更改数据集的基础 API,但我们鼓励您根据自己的目的更改和重新混合它。