使用任务导致气流中的下游任务
Using task results in downstream task in airflow
我有一个气流 DAG,我需要在其中列出 S3 对象键并在下游任务中使用它们
我在任务开始时将前缀中的 S3 键添加到列表中。此列表是在全球范围内创建的。任务完成后,当我打印列表时,我可以打印密钥。但是在任务完成后,当我尝试访问同一个列表时,我无法访问同一个列表。相反,我得到的是最初创建的空列表。我观察到在任务中声明为全局变量的情况相同。我创建了名称为 start_timestamp
的变量,并在第一个任务中将其设为全局变量并分配了一些值。当我在下游任务中访问此变量时,我看到的是旧值,而不是我在之前的任务中更新的值。
这是什么原因?有没有一种方法我们可以访问这些变量而不将它们发送到 XCom?
Airflow execution 并不意味着共享变量,即使它们是全局的。 Airflow 可以 运行 在不同的工作人员中执行不同的任务,因此不会默认在这些工作人员之间共享内存。
也就是说,如果您不想使用 XComs
(小数据的最佳实践),您唯一的选择就是使用 Variables
。从 Airflow's best practice documentation 可以看出,如果可以模板化它们,建议使用它(这取决于运营商)。
如果您想使用该选项,则必须设置变量并在需要使用时获取它。我想说得很清楚:不建议在 DAG 的主要定义中设置和获取变量,因为它会创建到元数据库的连接。如果可能的话,我会选择 XComs
。
我有一个气流 DAG,我需要在其中列出 S3 对象键并在下游任务中使用它们
我在任务开始时将前缀中的 S3 键添加到列表中。此列表是在全球范围内创建的。任务完成后,当我打印列表时,我可以打印密钥。但是在任务完成后,当我尝试访问同一个列表时,我无法访问同一个列表。相反,我得到的是最初创建的空列表。我观察到在任务中声明为全局变量的情况相同。我创建了名称为 start_timestamp
的变量,并在第一个任务中将其设为全局变量并分配了一些值。当我在下游任务中访问此变量时,我看到的是旧值,而不是我在之前的任务中更新的值。
这是什么原因?有没有一种方法我们可以访问这些变量而不将它们发送到 XCom?
Airflow execution 并不意味着共享变量,即使它们是全局的。 Airflow 可以 运行 在不同的工作人员中执行不同的任务,因此不会默认在这些工作人员之间共享内存。
也就是说,如果您不想使用 XComs
(小数据的最佳实践),您唯一的选择就是使用 Variables
。从 Airflow's best practice documentation 可以看出,如果可以模板化它们,建议使用它(这取决于运营商)。
如果您想使用该选项,则必须设置变量并在需要使用时获取它。我想说得很清楚:不建议在 DAG 的主要定义中设置和获取变量,因为它会创建到元数据库的连接。如果可能的话,我会选择 XComs
。