无法在 Airflow 的自定义运算符中传递 xcom
Unable to pass xcom in Custom Operators in Airflow
我有一个简单的线性 DAG(使用 Airflow 2.0 创建)有两个任务。我为扩展到 BaseOperator
的每个任务都有自定义运算符。以下是 dag 和运算符的代码:-
class Operator1(BaseOperator):
@apply_defaults
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
def execute(self, context):
...
logging.info('First task')
context['task_instance'].xcom_push(key="payload", value=data)
return data
class Operator2(BaseOperator):
@apply_defaults
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
def execute(self, context):
...
logging.info("context is ", context)
parameters = context['task_instance'].xcom_pull(key="payload", value=data)
with DAG('dag_1', default_args=DEFAULT_ARGS, schedule_interval=None) as dag:
TASK_1 = Operator1(
task_id='task_1',
do_xcom_push=True)
TASK_2 = Operator2(
task_id='task_2',
do_xcom_push=True)
TASK_1 >> TASK_2
当我运行 DAG 时,我发现用于获取xcom
值的context
是空的。我在Whosebug上搜索了很多答案,并尝试了其中提到的方法,但都没有用。
真的很想得到一些关于这个问题的提示 - 如何在自定义运算符中推送和拉取 xcom 值?
我拿了你的代码 运行 它,第一个问题是 start_date
没有定义,所以它以异常结束:
Exception has occurred: AirflowException (note: full exception trace is shown but execution is paused at: _run_module_as_main)
Task is missing the start_date parameter
此外,在Operator1
class 中,data
变量未定义。我想也许您在制作代码示例时错过了它们。
除此之外代码还有效,但我认为您应该在执行 xcom_pull 操作时考虑定义 task_id
参数。
来自TaskInstance
xcom_pull
方法说明:
:param task_ids: Only XComs from tasks with matching ids will be
pulled. Can pass None to remove the filter.
这是一个工作示例的代码,请注意,我使用两种等效的方法来执行 XComs
操作:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.utils.decorators import apply_defaults
from airflow.models import BaseOperator
class Operator1(BaseOperator):
@apply_defaults
def __init__(self, *args, **kwargs) -> None:
super(Operator1, self).__init__(*args, **kwargs)
def execute(self, context):
print('First task')
data = "valuable_data"
more_data = "more_valueable_data"
context['task_instance'].xcom_push(key="payload", value=data)
self.xcom_push(context, "more_data", more_data)
return data
class Operator2(BaseOperator):
@apply_defaults
def __init__(self, *args, **kwargs) -> None:
super(Operator2, self).__init__(*args, **kwargs)
def execute(self, context):
# print(f"context is {context}")
data = context['task_instance'].xcom_pull(
"task_1",
key="payload")
more_data = self.xcom_pull(context, "task_1", key="more_data")
print(f"Obtained data: {data}")
print(f"Obtained more_data: {more_data}")
with DAG('dag_1',
default_args={'owner': 'airflow'},
start_date=days_ago(1),
catchup=False,
schedule_interval=None) as dag:
TASK_1 = Operator1(
task_id='task_1'
)
TASK_2 = Operator2(
task_id='task_2'
)
TASK_1 >> TASK_2
日志来自 Task_2:
[2021-06-15 12:55:01,206] {taskinstance.py:1255} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=dag_1
AIRFLOW_CTX_TASK_ID=task_2
AIRFLOW_CTX_EXECUTION_DATE=2021-06-14T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-06-14T00:00:00+00:00
Obtained data: valuable_data
Obtained more_data: more_valueable_data
[2021-06-15 12:55:01,227] {taskinstance.py:1159} INFO - Marking task as SUCCESS. dag_id=dag_1, task_id=task_2, execution_date=20210614T000000, start_date=20210615T120402, end_date=20210615T125501
旁注:我更改了 __init__
方法以便也接受 *args。我正在使用 print
但可以使用 Airflow 记录器作为 self.log.info('msg')
.
让我知道这是否对您有用!
我有一个简单的线性 DAG(使用 Airflow 2.0 创建)有两个任务。我为扩展到 BaseOperator
的每个任务都有自定义运算符。以下是 dag 和运算符的代码:-
class Operator1(BaseOperator):
@apply_defaults
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
def execute(self, context):
...
logging.info('First task')
context['task_instance'].xcom_push(key="payload", value=data)
return data
class Operator2(BaseOperator):
@apply_defaults
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
def execute(self, context):
...
logging.info("context is ", context)
parameters = context['task_instance'].xcom_pull(key="payload", value=data)
with DAG('dag_1', default_args=DEFAULT_ARGS, schedule_interval=None) as dag:
TASK_1 = Operator1(
task_id='task_1',
do_xcom_push=True)
TASK_2 = Operator2(
task_id='task_2',
do_xcom_push=True)
TASK_1 >> TASK_2
当我运行 DAG 时,我发现用于获取xcom
值的context
是空的。我在Whosebug上搜索了很多答案,并尝试了其中提到的方法,但都没有用。
真的很想得到一些关于这个问题的提示 - 如何在自定义运算符中推送和拉取 xcom 值?
我拿了你的代码 运行 它,第一个问题是 start_date
没有定义,所以它以异常结束:
Exception has occurred: AirflowException (note: full exception trace is shown but execution is paused at: _run_module_as_main)
Task is missing the start_date parameter
此外,在Operator1
class 中,data
变量未定义。我想也许您在制作代码示例时错过了它们。
除此之外代码还有效,但我认为您应该在执行 xcom_pull 操作时考虑定义 task_id
参数。
来自TaskInstance
xcom_pull
方法说明:
:param task_ids: Only XComs from tasks with matching ids will be pulled. Can pass None to remove the filter.
这是一个工作示例的代码,请注意,我使用两种等效的方法来执行 XComs
操作:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.utils.decorators import apply_defaults
from airflow.models import BaseOperator
class Operator1(BaseOperator):
@apply_defaults
def __init__(self, *args, **kwargs) -> None:
super(Operator1, self).__init__(*args, **kwargs)
def execute(self, context):
print('First task')
data = "valuable_data"
more_data = "more_valueable_data"
context['task_instance'].xcom_push(key="payload", value=data)
self.xcom_push(context, "more_data", more_data)
return data
class Operator2(BaseOperator):
@apply_defaults
def __init__(self, *args, **kwargs) -> None:
super(Operator2, self).__init__(*args, **kwargs)
def execute(self, context):
# print(f"context is {context}")
data = context['task_instance'].xcom_pull(
"task_1",
key="payload")
more_data = self.xcom_pull(context, "task_1", key="more_data")
print(f"Obtained data: {data}")
print(f"Obtained more_data: {more_data}")
with DAG('dag_1',
default_args={'owner': 'airflow'},
start_date=days_ago(1),
catchup=False,
schedule_interval=None) as dag:
TASK_1 = Operator1(
task_id='task_1'
)
TASK_2 = Operator2(
task_id='task_2'
)
TASK_1 >> TASK_2
日志来自 Task_2:
[2021-06-15 12:55:01,206] {taskinstance.py:1255} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=dag_1
AIRFLOW_CTX_TASK_ID=task_2
AIRFLOW_CTX_EXECUTION_DATE=2021-06-14T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-06-14T00:00:00+00:00
Obtained data: valuable_data
Obtained more_data: more_valueable_data
[2021-06-15 12:55:01,227] {taskinstance.py:1159} INFO - Marking task as SUCCESS. dag_id=dag_1, task_id=task_2, execution_date=20210614T000000, start_date=20210615T120402, end_date=20210615T125501
旁注:我更改了 __init__
方法以便也接受 *args。我正在使用 print
但可以使用 Airflow 记录器作为 self.log.info('msg')
.
让我知道这是否对您有用!