在 Kedro 中在哪里执行节点输出的保存?

Where to perform the saving of an nodeoutput in Kedro?

在 Kedro 中,我们可以流水线化不同的节点和部分 运行 一些节点。当我们部分 运行 一些节点时,我们需要将一些来自节点的输入保存在某个地方,以便当另一个节点 运行 它可以访问前一个节点生成的数据。但是,我们在哪个文件中为此编写代码 - pipeline.py、run.py 或 nodes.py?

例如,我试图将目录路径直接保存到变量名 'model_path' 下的 DataCatalog。

来自 pipeline.py 的片段:

    # A mapping from a pipeline name to a ``Pipeline`` object.
def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
io = DataCatalog(dict(
    model_path=MemoryDataSet()
))

io.save('model_path', "data/06_models/model_test")
print('****', io.exists('model_path'))

pipeline = Pipeline([
    node(
        split_files,
        ["data_csv", "parameters"],
        ["train_filenames", "val_filenames", "train_labels", "val_labels"],
        name="splitting filenames"
    ),
    # node(
    #     create_and_train,
    #     ["train_filenames", "val_filenames", "train_labels", "val_labels", "parameters"],
    #     "model_path",
    #     name="Create Dataset, Train and Save Model"
    # ),
    node(
        validate_model,
        ["val_filenames", "val_labels", "model_path"],
        None,
        name="Validate Model",
    )

]).decorate(decorators.log_time, decorators.mem_profile)

return {
    "__default__": pipeline
}

但是,当我使用 Kedro 运行 时出现以下错误:

ValueError: Pipeline input(s) {'model_path'} not found in the DataCatalog

节点输入在传递给节点函数之前由 Kedro 从 DataCatalog 自动加载。在节点成功生成一些数据后,节点输出结果将保存到 DataCatalog 中。 DataCatalog 配置默认取自 conf/base/catalog.yml

在您的示例中,model_pathCreate Dataset, Train and Save Model 节点生成,然后由 Validate Model 消耗。如果在 conf/base/catalog.yml 中找不到所需的数据集定义,Kedro 将尝试使用 MemoryDataSet 将此数据集存储在内存中。如果您 运行 包含 Create Dataset...Validate Model 节点的管道(假设没有其他问题出现),这将起作用。但是,当您尝试单独 运行 Validate Model 节点时,Kedro 会尝试从内存中读取 model_path 数据集,而内存中不存在该数据集。

所以,TLDR:

要缓解这种情况,您需要:

a) 通过在 conf/base/catalog.yml:

中添加如下内容来坚持 model_path
model_path:
  type: TextLocalDataSet
  filepath: data/02_intermediate/model_path.txt

b) 运行 Create Dataset, Train and Save Model 节点(及其依赖项)至少一次

完成 a) 和 b) 后,您应该可以分别开始 运行ning Validate Model