如何动态组织不同长度的运算符组
How to dynamically organize group of operators of different lengths
我正在动态生成运算符,最终填充这样的运算符结构:
operators_dict = {0: [snowflake_1, [mysql_1, s3_1]], 1: [snowflake_2, s3_2]}
interlude_operators = [dummy_op_1]
sfn_levels = 2
operators_dict 可以有任意数量的键,值可以有任意数量的项目,它们可以是具有 2 个运算符或单个运算符的列表。特定的 operators_dict 可能只有单个运算符,如 {0: [snow_1, snow_2, snow_3]...}
,因此在所有情况下都不会发生嵌套列表的情况。
使用该示例的 DAG 所需的输出图视图如下:
我试过这个:
for i in range(sfn_levels - 1):
operators_dict[i] >> interlude_operators[i] >> operators_dict[i + 1]
但是我得到了错误
Relationships can only be set between Operators; received list
我该如何解决这个问题?非常感谢您
你不能这样做。 >> 运算符仅将 Task 或 List[Task] 作为左侧操作数。你正在传递它 List[Union[Task, List[Task]].
如果要保留此结构,您必须在那里做更多的手动工作并手动添加一些依赖项。
@NicoE 建议的任务组也是个好主意,那么你可以有任意复杂的结构,不限于任务和任务列表。
还有一条评论:更新 << 运算符也以这种方式工作并处理 List[Union[Task, List[Task]] 将是一个有趣的补充——也许您可以添加一个 PR,将此功能添加到 Airflow ?
我正在动态生成运算符,最终填充这样的运算符结构:
operators_dict = {0: [snowflake_1, [mysql_1, s3_1]], 1: [snowflake_2, s3_2]}
interlude_operators = [dummy_op_1]
sfn_levels = 2
operators_dict 可以有任意数量的键,值可以有任意数量的项目,它们可以是具有 2 个运算符或单个运算符的列表。特定的 operators_dict 可能只有单个运算符,如 {0: [snow_1, snow_2, snow_3]...}
,因此在所有情况下都不会发生嵌套列表的情况。
使用该示例的 DAG 所需的输出图视图如下:
我试过这个:
for i in range(sfn_levels - 1):
operators_dict[i] >> interlude_operators[i] >> operators_dict[i + 1]
但是我得到了错误
Relationships can only be set between Operators; received list
我该如何解决这个问题?非常感谢您
你不能这样做。 >> 运算符仅将 Task 或 List[Task] 作为左侧操作数。你正在传递它 List[Union[Task, List[Task]].
如果要保留此结构,您必须在那里做更多的手动工作并手动添加一些依赖项。
@NicoE 建议的任务组也是个好主意,那么你可以有任意复杂的结构,不限于任务和任务列表。
还有一条评论:更新 << 运算符也以这种方式工作并处理 List[Union[Task, List[Task]] 将是一个有趣的补充——也许您可以添加一个 PR,将此功能添加到 Airflow ?