重新处理 Airflow 上的历史数据

Reprocess historical data on Airflow

我正在将一些管道迁移到气流。我希望能够 运行 在历史负载的特定时间范围内启用一些 DAG,并且我正在探索我的选择。注意:我不想重新执行之前的 运行s(例如过去 10 天),但我希望能够根据 last_loaded 时间戳变量(例如 2017-12- 09 00:00:00.000000) 在我需要的任何时候(甚至在创建 DAG 之前)。该变量也用于外部调用API。

我脑子里总共有4个概念:

  1. 当前的 dag 运行 通过元数据数据库中的 xcom table 实现交换此变量。虽然每次我想修改它时,我都必须更新一个数据类型为 blob 的字段。我什至不确定这是否可能。

  2. 将此参数保存在别处。易于实施的解决方案,但我不想重新发明轮子。如果总是有一些功能由气流实现我想探索它。

  3. 气流变量:到目前为止可能没有最受认可的气流概念,但我确实觉得这就是我想要的。

  4. 回填:如果我没记错的话,这是附加到之前的执行中的。因此,如果我的 dag 在 12 月开始每天 运行ning,我将无法从 8 月加载数据。

有什么建议吗?

对于此用例,您可以按如下方式处理 ETL:

  1. 从变量中读取最后一个 last_loaded 值。
  2. 运行 last_loaded 到 current_timestamp 或 execution_date 或您选择的任何更高边界之间的 ETL。
  3. 将上边界存储到变量中。

骨架概述可以是:

def set_dag_variables(**kwargs):
    new_value = kwargs['var_value']
    Variable.set(key=DAG_ID, value=new_value, serialize_json=True)


last_loaded = Varible.get(key=var_name) # don't do this in production. Use macro instead.
your_higher_boundary_param = datetime.now(tz=None)

op1 = YourOperaror(
    task_id='op1_task',
    params = {"param1":last_loaded,
             param2: your_higher_boundary_param }
)
op2 = PythonOperator(
    task_id='set_dag_variable_task',
    provide_context=True,
    python_callable=set_dag_variables,
    op_kwargs={'var_value': your_higher_boundary_param}
    )
op1 >> op2

注意:这是非常高的级别,细节很重要!

例如,我在 operator/macro 范围之外使用了 Varible.get,这是一种不好的做法。正确的方法是使用宏,但为了示例的建议我简化了它。