使用 Dagster 进行交叉验证
Cross Validation using Dagster
我已经开始在我们的 ML 管道中使用 Dagster,并且正在 运行 解决一些基本问题,我想知道我是否遗漏了一些微不足道的东西,或者这是否就是这样...... .
假设我有一个简单的 ML 管道:
Load raw data --> Process data into table --> Split train / test --> train model --> evaluate model.
Dagster 中的线性模型很简单。但是如果我想添加一个小循环怎么办,比如为了交叉验证目的:
Load raw data --> Process data into table --> Split into k folds, and for each fold:
- fold 1: train model --> evaluate
- fold 2: train model --> evaluate
- fold 3: train model --> evaluate
--> summarize cross validation results.
在 Dagster 中是否有一种简洁明了的方法来做到这一点?我做事的方式是:
Load raw data --> Process data into table --> Split into K folds --> choose fold k --> train model --> evaluate model
将折叠 "k" 作为管道的输入参数。然后运行管道K次。
我在这里错过了什么?
是的,Dagster 确实支持在单个管道中将实体扇出到多个实体,而不是扇入到汇实体(即汇总结果)。这是一些示例代码和 dagit (full dag and zoomed in) 中相应的 dag 可视化。
@solid
def load_raw_data(_):
yield Output('loaded_data')
@solid
def process_data_into_table(_, raw_data):
yield Output(raw_data)
@solid(
output_defs=[
OutputDefinition(name='fold_one', dagster_type=int, is_required=True),
OutputDefinition(name='fold_two', dagster_type=int, is_required=True),
],
)
def split_into_two_folds(_, table):
yield Output(1, 'fold_one')
yield Output(2, 'fold_two')
@solid
def train_fold(_, fold):
yield Output('model')
@solid
def evaluate_fold(_, model):
yield Output('compute_result')
@composite_solid
def process_fold(fold):
return evaluate_fold(train_fold(fold))
@solid
def summarize_results(context, fold_1_result, fold_2_result):
yield Output('summary_stats')
@pipeline
def ml_pipeline():
fold_one, fold_two = split_into_two_folds(process_data_into_table(load_raw_data()))
process_fold_one = process_fold.alias('process_fold_one')
process_fold_two = process_fold.alias('process_fold_two')
summarize_results(process_fold_one(fold_one), process_fold_two(fold_two))
在示例代码中,我们使用别名来为每个折叠重复使用相同的逻辑。我们还整合了处理复合实体中每个折叠的逻辑。
另一种选择是以编程方式直接创建 PipelineDefinition,但我会推荐上面的方法。
我已经开始在我们的 ML 管道中使用 Dagster,并且正在 运行 解决一些基本问题,我想知道我是否遗漏了一些微不足道的东西,或者这是否就是这样...... .
假设我有一个简单的 ML 管道:
Load raw data --> Process data into table --> Split train / test --> train model --> evaluate model.
Dagster 中的线性模型很简单。但是如果我想添加一个小循环怎么办,比如为了交叉验证目的:
Load raw data --> Process data into table --> Split into k folds, and for each fold:
- fold 1: train model --> evaluate
- fold 2: train model --> evaluate
- fold 3: train model --> evaluate
--> summarize cross validation results.
在 Dagster 中是否有一种简洁明了的方法来做到这一点?我做事的方式是:
Load raw data --> Process data into table --> Split into K folds --> choose fold k --> train model --> evaluate model
将折叠 "k" 作为管道的输入参数。然后运行管道K次。
我在这里错过了什么?
是的,Dagster 确实支持在单个管道中将实体扇出到多个实体,而不是扇入到汇实体(即汇总结果)。这是一些示例代码和 dagit (full dag and zoomed in) 中相应的 dag 可视化。
@solid
def load_raw_data(_):
yield Output('loaded_data')
@solid
def process_data_into_table(_, raw_data):
yield Output(raw_data)
@solid(
output_defs=[
OutputDefinition(name='fold_one', dagster_type=int, is_required=True),
OutputDefinition(name='fold_two', dagster_type=int, is_required=True),
],
)
def split_into_two_folds(_, table):
yield Output(1, 'fold_one')
yield Output(2, 'fold_two')
@solid
def train_fold(_, fold):
yield Output('model')
@solid
def evaluate_fold(_, model):
yield Output('compute_result')
@composite_solid
def process_fold(fold):
return evaluate_fold(train_fold(fold))
@solid
def summarize_results(context, fold_1_result, fold_2_result):
yield Output('summary_stats')
@pipeline
def ml_pipeline():
fold_one, fold_two = split_into_two_folds(process_data_into_table(load_raw_data()))
process_fold_one = process_fold.alias('process_fold_one')
process_fold_two = process_fold.alias('process_fold_two')
summarize_results(process_fold_one(fold_one), process_fold_two(fold_two))
在示例代码中,我们使用别名来为每个折叠重复使用相同的逻辑。我们还整合了处理复合实体中每个折叠的逻辑。
另一种选择是以编程方式直接创建 PipelineDefinition,但我会推荐上面的方法。