在 S3ToRedshiftOperator 中拉取 xcom 变量时未定义 ti
ti is not defined while pulling xcom variable in S3ToRedshiftOperator
我正在使用 S3ToRedshiftOperator 将 csv 文件加载到 Redshift 数据库中。请帮助将 xcom 变量传递给 S3ToRedshiftOperator。我们如何在不使用自定义功能的情况下推送 xcom?
错误:
NameError: name 'ti' is not defined
使用以下代码:
from airflow.operators.s3_to_redshift_operator import S3ToRedshiftOperator
def export_db_fn(**kwargs):
session = settings.Session()
outkey = S3_KEY.format(MWAA_ENV_NAME, name[6:])
print(outkey)
s3_client.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
ti.xcom_push(key='FILE_PATH', value=outkey)
return "OK"
with DAG(dag_id="export_info", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
export_info = PythonOperator(
task_id="export_info",
python_callable=export_db_fn,
provide_context=True
)
transfer_s3_to_redshift = S3ToRedshiftOperator(
s3_bucket=S3_BUCKET,
s3_key="{{ti.xcom_pull(key='FILE_PATH', task_ids='export_info')}}",
schema="dw_stage",
table=REDSHIFT_TABLE,
copy_options=['csv',"IGNOREHEADER 1"],
redshift_conn_id='redshift',
autocommit=True,
task_id='transfer_s3_to_redshift',
)
start >> export_info >> transfer_s3_to_redshift >> end
错误信息说明了问题所在。
ti
未定义。
当您设置 provide_context=True
时,Airflow 会在 python 可调用对象中为您提供上下文。其中一个属性是 ti
(参见 source code)。所以你需要从kwargs中提取它或者在函数签名中设置它。
您的代码应该是:
def export_db_fn(**kwargs):
...
ti = kwargs['ti']
ti.xcom_push(key='FILE_PATH', value=outkey)
...
或者如果你想直接使用ti
那么:
def export_db_fn(ti, **kwargs):
...
ti.xcom_push(key='FILE_PATH', value=outkey)
...
注意:在 Airflow >= 2.0 中不需要设置 provide_context=True
我正在使用 S3ToRedshiftOperator 将 csv 文件加载到 Redshift 数据库中。请帮助将 xcom 变量传递给 S3ToRedshiftOperator。我们如何在不使用自定义功能的情况下推送 xcom?
错误:
NameError: name 'ti' is not defined
使用以下代码:
from airflow.operators.s3_to_redshift_operator import S3ToRedshiftOperator
def export_db_fn(**kwargs):
session = settings.Session()
outkey = S3_KEY.format(MWAA_ENV_NAME, name[6:])
print(outkey)
s3_client.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
ti.xcom_push(key='FILE_PATH', value=outkey)
return "OK"
with DAG(dag_id="export_info", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
export_info = PythonOperator(
task_id="export_info",
python_callable=export_db_fn,
provide_context=True
)
transfer_s3_to_redshift = S3ToRedshiftOperator(
s3_bucket=S3_BUCKET,
s3_key="{{ti.xcom_pull(key='FILE_PATH', task_ids='export_info')}}",
schema="dw_stage",
table=REDSHIFT_TABLE,
copy_options=['csv',"IGNOREHEADER 1"],
redshift_conn_id='redshift',
autocommit=True,
task_id='transfer_s3_to_redshift',
)
start >> export_info >> transfer_s3_to_redshift >> end
错误信息说明了问题所在。
ti
未定义。
当您设置 provide_context=True
时,Airflow 会在 python 可调用对象中为您提供上下文。其中一个属性是 ti
(参见 source code)。所以你需要从kwargs中提取它或者在函数签名中设置它。
您的代码应该是:
def export_db_fn(**kwargs):
...
ti = kwargs['ti']
ti.xcom_push(key='FILE_PATH', value=outkey)
...
或者如果你想直接使用ti
那么:
def export_db_fn(ti, **kwargs):
...
ti.xcom_push(key='FILE_PATH', value=outkey)
...
注意:在 Airflow >= 2.0 中不需要设置 provide_context=True