重新处理 Airflow 上的历史数据
Reprocess historical data on Airflow
我正在将一些管道迁移到气流。我希望能够 运行 在历史负载的特定时间范围内启用一些 DAG,并且我正在探索我的选择。注意:我不想重新执行之前的 运行s(例如过去 10 天),但我希望能够根据 last_loaded 时间戳变量(例如 2017-12- 09 00:00:00.000000) 在我需要的任何时候(甚至在创建 DAG 之前)。该变量也用于外部调用API。
我脑子里总共有4个概念:
当前的 dag 运行 通过元数据数据库中的 xcom table 实现交换此变量。虽然每次我想修改它时,我都必须更新一个数据类型为 blob 的字段。我什至不确定这是否可能。
将此参数保存在别处。易于实施的解决方案,但我不想重新发明轮子。如果总是有一些功能由气流实现我想探索它。
气流变量:到目前为止可能没有最受认可的气流概念,但我确实觉得这就是我想要的。
回填:如果我没记错的话,这是附加到之前的执行中的。因此,如果我的 dag 在 12 月开始每天 运行ning,我将无法从 8 月加载数据。
有什么建议吗?
对于此用例,您可以按如下方式处理 ETL:
- 从变量中读取最后一个 last_loaded 值。
- 运行 last_loaded 到 current_timestamp 或 execution_date 或您选择的任何更高边界之间的 ETL。
- 将上边界存储到变量中。
骨架概述可以是:
def set_dag_variables(**kwargs):
new_value = kwargs['var_value']
Variable.set(key=DAG_ID, value=new_value, serialize_json=True)
last_loaded = Varible.get(key=var_name) # don't do this in production. Use macro instead.
your_higher_boundary_param = datetime.now(tz=None)
op1 = YourOperaror(
task_id='op1_task',
params = {"param1":last_loaded,
param2: your_higher_boundary_param }
)
op2 = PythonOperator(
task_id='set_dag_variable_task',
provide_context=True,
python_callable=set_dag_variables,
op_kwargs={'var_value': your_higher_boundary_param}
)
op1 >> op2
注意:这是非常高的级别,细节很重要!
例如,我在 operator/macro 范围之外使用了 Varible.get
,这是一种不好的做法。正确的方法是使用宏,但为了示例的建议我简化了它。
我正在将一些管道迁移到气流。我希望能够 运行 在历史负载的特定时间范围内启用一些 DAG,并且我正在探索我的选择。注意:我不想重新执行之前的 运行s(例如过去 10 天),但我希望能够根据 last_loaded 时间戳变量(例如 2017-12- 09 00:00:00.000000) 在我需要的任何时候(甚至在创建 DAG 之前)。该变量也用于外部调用API。
我脑子里总共有4个概念:
当前的 dag 运行 通过元数据数据库中的 xcom table 实现交换此变量。虽然每次我想修改它时,我都必须更新一个数据类型为 blob 的字段。我什至不确定这是否可能。
将此参数保存在别处。易于实施的解决方案,但我不想重新发明轮子。如果总是有一些功能由气流实现我想探索它。
气流变量:到目前为止可能没有最受认可的气流概念,但我确实觉得这就是我想要的。
回填:如果我没记错的话,这是附加到之前的执行中的。因此,如果我的 dag 在 12 月开始每天 运行ning,我将无法从 8 月加载数据。
有什么建议吗?
对于此用例,您可以按如下方式处理 ETL:
- 从变量中读取最后一个 last_loaded 值。
- 运行 last_loaded 到 current_timestamp 或 execution_date 或您选择的任何更高边界之间的 ETL。
- 将上边界存储到变量中。
骨架概述可以是:
def set_dag_variables(**kwargs):
new_value = kwargs['var_value']
Variable.set(key=DAG_ID, value=new_value, serialize_json=True)
last_loaded = Varible.get(key=var_name) # don't do this in production. Use macro instead.
your_higher_boundary_param = datetime.now(tz=None)
op1 = YourOperaror(
task_id='op1_task',
params = {"param1":last_loaded,
param2: your_higher_boundary_param }
)
op2 = PythonOperator(
task_id='set_dag_variable_task',
provide_context=True,
python_callable=set_dag_variables,
op_kwargs={'var_value': your_higher_boundary_param}
)
op1 >> op2
注意:这是非常高的级别,细节很重要!
例如,我在 operator/macro 范围之外使用了 Varible.get
,这是一种不好的做法。正确的方法是使用宏,但为了示例的建议我简化了它。