如何在 DAG 在 Airflow 中完成其 运行 后删除 XCOM 对象

How to delete XCOM objects once the DAG finishes its run in Airflow

我在 XCOM 中有一个巨大的 json 文件,一旦 dag 执行完成我就不需要它了,但我仍然在 UI 中看到 Xcom 对象和所有数据,一旦 DAG 运行 完成,是否有任何方法以编程方式删除 XCOM。

谢谢

您必须添加一个任务,这取决于您在 DAG 运行 完成后删除 XCOM 的元数据数据库(sqllite、PostgreSql、MySql..)。

delete_xcom_task = PostgresOperator(
      task_id='delete-xcom-task',
      postgres_conn_id='airflow_db',
      sql="delete from xcom where dag_id=dag.dag_id and 
           task_id='your_task_id' and execution_date={{ ds }}",
      dag=dag)

您可以在 运行 dag 之前验证您的查询。

数据分析 -> 即席查询 -> airflow_db -> 查询 -> 运行!

下面是对我有用的代码,这将删除 DAG 中所有任务的 xcom(如果只需要删除特定任务的 xcom,请将 task_id 添加到 SQL):

因为 dag_id 是动态的,dates 应该遵循 SQL.[=11= 的相应语法]

from airflow.operators.postgres_operator import PostgresOperator

delete_xcom_task_inst = PostgresOperator(task_id='delete_xcom',
                                            postgres_conn_id='your_conn_id',
                                            sql="delete from xcom where dag_id= '"+dag.dag_id+"' and date(execution_date)=date('{{ ds }}')"
                                            )

您可以通过 sqlalchemy 以编程方式执行清理,这样您的解决方案就不会在数据库结构发生变化时中断:

from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.dag_id == "your dag id").delete()

您还可以清除旧的 XCom 数据:

from airflow.utils.db import provide_session
from airflow.models import XCom
from sqlalchemy import func

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()

如果你想在 dag 完成后清除 XCom 我认为最干净的解决方案是使用 DAG 模型的 "on_success_callback" 属性 class:

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(context, session=None):
    dag_id = context["ti"]["dag_id"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

dag = DAG( ...
    on_success_callback=cleanup_xcom,
)

正在使用

from sqlalchemy import func 
[...]
session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()

按日期过滤(如上所述)对我不起作用。相反,我必须提供日期时间(包括时区):

from airflow.models import XCom
from datetime import datetime, timedelta, timezone

[...]

@provide_session
def cleanup_xcom(session=None):
    ts_limit = datetime.now(timezone.utc) - timedelta(days=2)
    session.query(XCom).filter(XCom.execution_date <= ts_limit).delete()
    logging.info(f"deleted all XCOMs older than {ts_limit}")

xcom_cleaner = python_operator.PythonOperator(
    task_id='delete-old-xcoms',
    python_callable=cleanup_xcom)

xcom_cleaner 

我对这个问题的解决方案是:

from airflow.utils.db import provide_session
from airflow.models import XCom

dag = DAG(...)

@provide_session
def cleanup_xcom(**context):     
    dag = context["dag"]
    dag_id = dag._dag_id 
    session=context["session"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

clean_xcom = PythonOperator(
    task_id="clean_xcom",
    python_callable = cleanup_xcom,
    provide_context=True, 
    dag=dag
)

clean_xcom

在 Airflow 2.1.x 中,下面的代码无法正常工作...

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(context, session=None):
    dag_id = context["ti"]["dag_id"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

dag = DAG( ...
    on_success_callback=cleanup_xcom,
)

所以改为

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

with DAG(dag_id="cleanup_xcom_demo", schedule_interval=None, start_date=days_ago(2)) as dag:
    # cleanup_xcom
    @provide_session
    def cleanup_xcom(session=None, **context):
        dag = context["dag"]
        dag_id = dag._dag_id 
        # It will delete all xcom of the dag_id
        session.query(XCom).filter(XCom.dag_id == dag_id).delete()

    clean_xcom = PythonOperator(
        task_id="clean_xcom",
        python_callable = cleanup_xcom,
        provide_context=True, 
        # dag=dag
    )
    
    start  = DummyOperator(task_id="start")
    end = DummyOperator(task_id="end", trigger_rule="none_failed")
    
    start >> clean_xcom >> end