在 Kedro 中在哪里执行节点输出的保存?
Where to perform the saving of an nodeoutput in Kedro?
在 Kedro 中,我们可以流水线化不同的节点和部分 运行 一些节点。当我们部分 运行 一些节点时,我们需要将一些来自节点的输入保存在某个地方,以便当另一个节点 运行 它可以访问前一个节点生成的数据。但是,我们在哪个文件中为此编写代码 - pipeline.py、run.py 或 nodes.py?
例如,我试图将目录路径直接保存到变量名 'model_path' 下的 DataCatalog。
来自 pipeline.py 的片段:
# A mapping from a pipeline name to a ``Pipeline`` object.
def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
io = DataCatalog(dict(
model_path=MemoryDataSet()
))
io.save('model_path', "data/06_models/model_test")
print('****', io.exists('model_path'))
pipeline = Pipeline([
node(
split_files,
["data_csv", "parameters"],
["train_filenames", "val_filenames", "train_labels", "val_labels"],
name="splitting filenames"
),
# node(
# create_and_train,
# ["train_filenames", "val_filenames", "train_labels", "val_labels", "parameters"],
# "model_path",
# name="Create Dataset, Train and Save Model"
# ),
node(
validate_model,
["val_filenames", "val_labels", "model_path"],
None,
name="Validate Model",
)
]).decorate(decorators.log_time, decorators.mem_profile)
return {
"__default__": pipeline
}
但是,当我使用 Kedro 运行 时出现以下错误:
ValueError: Pipeline input(s) {'model_path'} not found in the DataCatalog
节点输入在传递给节点函数之前由 Kedro 从 DataCatalog
自动加载。在节点成功生成一些数据后,节点输出结果将保存到 DataCatalog 中。 DataCatalog 配置默认取自 conf/base/catalog.yml
。
在您的示例中,model_path
由 Create Dataset, Train and Save Model
节点生成,然后由 Validate Model
消耗。如果在 conf/base/catalog.yml
中找不到所需的数据集定义,Kedro 将尝试使用 MemoryDataSet
将此数据集存储在内存中。如果您 运行 包含 Create Dataset...
和 Validate Model
节点的管道(假设没有其他问题出现),这将起作用。但是,当您尝试单独 运行 Validate Model
节点时,Kedro 会尝试从内存中读取 model_path
数据集,而内存中不存在该数据集。
所以,TLDR:
要缓解这种情况,您需要:
a) 通过在 conf/base/catalog.yml
:
中添加如下内容来坚持 model_path
model_path:
type: TextLocalDataSet
filepath: data/02_intermediate/model_path.txt
b) 运行 Create Dataset, Train and Save Model
节点(及其依赖项)至少一次
完成 a) 和 b) 后,您应该可以分别开始 运行ning Validate Model
。
在 Kedro 中,我们可以流水线化不同的节点和部分 运行 一些节点。当我们部分 运行 一些节点时,我们需要将一些来自节点的输入保存在某个地方,以便当另一个节点 运行 它可以访问前一个节点生成的数据。但是,我们在哪个文件中为此编写代码 - pipeline.py、run.py 或 nodes.py?
例如,我试图将目录路径直接保存到变量名 'model_path' 下的 DataCatalog。
来自 pipeline.py 的片段:
# A mapping from a pipeline name to a ``Pipeline`` object.
def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
io = DataCatalog(dict(
model_path=MemoryDataSet()
))
io.save('model_path', "data/06_models/model_test")
print('****', io.exists('model_path'))
pipeline = Pipeline([
node(
split_files,
["data_csv", "parameters"],
["train_filenames", "val_filenames", "train_labels", "val_labels"],
name="splitting filenames"
),
# node(
# create_and_train,
# ["train_filenames", "val_filenames", "train_labels", "val_labels", "parameters"],
# "model_path",
# name="Create Dataset, Train and Save Model"
# ),
node(
validate_model,
["val_filenames", "val_labels", "model_path"],
None,
name="Validate Model",
)
]).decorate(decorators.log_time, decorators.mem_profile)
return {
"__default__": pipeline
}
但是,当我使用 Kedro 运行 时出现以下错误:
ValueError: Pipeline input(s) {'model_path'} not found in the DataCatalog
节点输入在传递给节点函数之前由 Kedro 从 DataCatalog
自动加载。在节点成功生成一些数据后,节点输出结果将保存到 DataCatalog 中。 DataCatalog 配置默认取自 conf/base/catalog.yml
。
在您的示例中,model_path
由 Create Dataset, Train and Save Model
节点生成,然后由 Validate Model
消耗。如果在 conf/base/catalog.yml
中找不到所需的数据集定义,Kedro 将尝试使用 MemoryDataSet
将此数据集存储在内存中。如果您 运行 包含 Create Dataset...
和 Validate Model
节点的管道(假设没有其他问题出现),这将起作用。但是,当您尝试单独 运行 Validate Model
节点时,Kedro 会尝试从内存中读取 model_path
数据集,而内存中不存在该数据集。
所以,TLDR:
要缓解这种情况,您需要:
a) 通过在 conf/base/catalog.yml
:
model_path
model_path:
type: TextLocalDataSet
filepath: data/02_intermediate/model_path.txt
b) 运行 Create Dataset, Train and Save Model
节点(及其依赖项)至少一次
完成 a) 和 b) 后,您应该可以分别开始 运行ning Validate Model
。