Airflow XCOM KeyError: 'task_instance'
Airflow XCOM KeyError: 'task_instance'
我正在尝试设置动态序列 etl 作业,它将使用 XCOM 从 运行 的第一个任务中获取数据。这是当前代码:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime as dt, timedelta as td, date
from airflow.models import BaseOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
START_DT = dt.combine(dt.today(), dt.min.time())
END_DT = dt.combine(dt.today(), dt.max.time())
NOW = dt.now()
CURRENT_EXEC = '{{ execution_date }}'
TODAY_MD = dt.today().strftime("%m%d")
def datetime_range(start, end, delta):
"""Generates the date range with time separation"""
current = start
if not isinstance(delta, td):
delta = td(**delta)
while current < end:
yield current
current += delta
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': START_DT,
'email': ['test@test.com'],
'email_on_failure': False,
'email_on_retry': False,
'queue': 'etl',
'retries': 1,
'retry_delay': td(minutes=1),
}
dag_name = 'SEQ_TEST_01'
dag = DAG(dag_id=dag_name, default_args=default_args, schedule_interval=td(minutes=30))
def seq_job(sq_dt, **kwargs):
for count, dt_in in enumerate(datetime_range(START_DT, END_DT, {'minutes':30}), 1):
if sq_dt < str(dt_in):
curr_seq = count, dt_in, dt_in + td(minutes=29, seconds=59)
sequence = int(curr_seq[0])
return sequence
pycall = PythonOperator(
task_id='seq_sensor',
provide_context=True,
python_callable=seq_job,
op_kwargs={'sq_dt': CURRENT_EXEC},
dag=dag)
def group(grp, **context):
sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor')
grp = '%0.2d' % grp
database = 'TEST'
today_date = '{{ ds_nodash }}'
return BashOperator(
task_id='ETL_GRP{}_{}_{}'.format(database, sequence, gap),
bash_command='script.sh {} {} {} {}'.format(today_date, sequence, database, grp),
dag=dag)
complete = DummyOperator(
task_id='All_Sequences_complete',
dag=dag)
pycall >> group(1) >> complete
pycall >> group(2) >> complete
pycall >> group(3) >> complete
问题是无论我尝试什么,我都会不断收到此错误:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 263, in process_file
m = imp.load_source(mod_name, filepath)
File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 66, in <module>
pycall >> group(1) >> complete
File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 56, in group
sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor')
KeyError: 'task_instance'
不确定是不是我遗漏了一些小东西,还是我什么都错了。仍然是气流的新手,并尝试将我们的 ETL 测试环境设置为每 30 分钟 运行,具有由 datetime_range
生成并基于 execution_date 变量的唯一序列号。
请尝试使用 context['ti']
。
我通过将 bash 运算符移动到另一个函数并通过以下方式从 python 运算符中提取数据来解决它:
def bash_out(group, **kwargs):
sequence = "{{ task_instance.xcom_pull(task_ids='seq_sensor') }}"
return BashOperator(task_id='ETL_{}_GRP{}'.format(database, group), bash_command='script.sh {} {} {} {}'.format(today_date, database, sequence, group), dag=dag)
并设置依赖关系:
pycall >> [bash_out('01'), bash_out('02'), bash_out('03')] >> complete
确保 'provide_context': True
出现在 default_args
中。
我正在尝试设置动态序列 etl 作业,它将使用 XCOM 从 运行 的第一个任务中获取数据。这是当前代码:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime as dt, timedelta as td, date
from airflow.models import BaseOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
START_DT = dt.combine(dt.today(), dt.min.time())
END_DT = dt.combine(dt.today(), dt.max.time())
NOW = dt.now()
CURRENT_EXEC = '{{ execution_date }}'
TODAY_MD = dt.today().strftime("%m%d")
def datetime_range(start, end, delta):
"""Generates the date range with time separation"""
current = start
if not isinstance(delta, td):
delta = td(**delta)
while current < end:
yield current
current += delta
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': START_DT,
'email': ['test@test.com'],
'email_on_failure': False,
'email_on_retry': False,
'queue': 'etl',
'retries': 1,
'retry_delay': td(minutes=1),
}
dag_name = 'SEQ_TEST_01'
dag = DAG(dag_id=dag_name, default_args=default_args, schedule_interval=td(minutes=30))
def seq_job(sq_dt, **kwargs):
for count, dt_in in enumerate(datetime_range(START_DT, END_DT, {'minutes':30}), 1):
if sq_dt < str(dt_in):
curr_seq = count, dt_in, dt_in + td(minutes=29, seconds=59)
sequence = int(curr_seq[0])
return sequence
pycall = PythonOperator(
task_id='seq_sensor',
provide_context=True,
python_callable=seq_job,
op_kwargs={'sq_dt': CURRENT_EXEC},
dag=dag)
def group(grp, **context):
sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor')
grp = '%0.2d' % grp
database = 'TEST'
today_date = '{{ ds_nodash }}'
return BashOperator(
task_id='ETL_GRP{}_{}_{}'.format(database, sequence, gap),
bash_command='script.sh {} {} {} {}'.format(today_date, sequence, database, grp),
dag=dag)
complete = DummyOperator(
task_id='All_Sequences_complete',
dag=dag)
pycall >> group(1) >> complete
pycall >> group(2) >> complete
pycall >> group(3) >> complete
问题是无论我尝试什么,我都会不断收到此错误:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 263, in process_file
m = imp.load_source(mod_name, filepath)
File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 66, in <module>
pycall >> group(1) >> complete
File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 56, in group
sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor')
KeyError: 'task_instance'
不确定是不是我遗漏了一些小东西,还是我什么都错了。仍然是气流的新手,并尝试将我们的 ETL 测试环境设置为每 30 分钟 运行,具有由 datetime_range
生成并基于 execution_date 变量的唯一序列号。
请尝试使用 context['ti']
。
我通过将 bash 运算符移动到另一个函数并通过以下方式从 python 运算符中提取数据来解决它:
def bash_out(group, **kwargs):
sequence = "{{ task_instance.xcom_pull(task_ids='seq_sensor') }}"
return BashOperator(task_id='ETL_{}_GRP{}'.format(database, group), bash_command='script.sh {} {} {} {}'.format(today_date, database, sequence, group), dag=dag)
并设置依赖关系:
pycall >> [bash_out('01'), bash_out('02'), bash_out('03')] >> complete
确保 'provide_context': True
出现在 default_args
中。