在 S3ToRedshiftOperator 中拉取 xcom 变量时未定义 ti

ti is not defined while pulling xcom variable in S3ToRedshiftOperator

我正在使用 S3ToRedshiftOperator 将 csv 文件加载到 Redshift 数据库中。请帮助将 xcom 变量传递给 S3ToRedshiftOperator。我们如何在不使用自定义功能的情况下推送 xcom?

错误:

NameError: name 'ti' is not defined

使用以下代码:

from airflow.operators.s3_to_redshift_operator import S3ToRedshiftOperator

def export_db_fn(**kwargs):
session = settings.Session()
outkey = S3_KEY.format(MWAA_ENV_NAME, name[6:])
print(outkey)
s3_client.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
ti.xcom_push(key='FILE_PATH', value=outkey)
return "OK"


with DAG(dag_id="export_info", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
export_info = PythonOperator(
    task_id="export_info",
    python_callable=export_db_fn,
    provide_context=True     
)


transfer_s3_to_redshift = S3ToRedshiftOperator(
        s3_bucket=S3_BUCKET,
        s3_key="{{ti.xcom_pull(key='FILE_PATH', task_ids='export_info')}}",
        schema="dw_stage",
        table=REDSHIFT_TABLE,
        copy_options=['csv',"IGNOREHEADER 1"],
        redshift_conn_id='redshift',
        autocommit=True,
        task_id='transfer_s3_to_redshift',
    )
            
    
start >> export_info >> transfer_s3_to_redshift >> end 

错误信息说明了问题所在。 ti 未定义。

当您设置 provide_context=True 时,Airflow 会在 python 可调用对象中为您提供上下文。其中一个属性是 ti(参见 source code)。所以你需要从kwargs中提取它或者在函数签名中设置它。

您的代码应该是:

def export_db_fn(**kwargs):
    ...
    ti = kwargs['ti']
    ti.xcom_push(key='FILE_PATH', value=outkey)
    ...

或者如果你想直接使用ti那么:

def export_db_fn(ti, **kwargs):
    ...
    ti.xcom_push(key='FILE_PATH', value=outkey)
    ...

注意:在 Airflow >= 2.0 中不需要设置 provide_context=True