在 Airflow/ 上使用 PythonOperator 时如何使用 Python 函数的返回值

How to use the returned value of a Python Function when using a PythonOperator on Airflow/

我正在构建一个带有多个 PythonOperator 节点的气流 DAG。其中一个 returns 一个值,稍后将用作另一个运算符的参数。但是我该如何存储和访问这个返回值呢?

例如: 我有以下功能

def sum(a, b):
    return a + b

def compare(c, d):
   return c > d

以及以下日期:

sum = PythonOperator(
      task_id = 'sum',
      python_callable = sum,
      op_args = [a, b],
      dag = dag
      )

compare = PythonOperator(
     task_id = 'compare',
     python_callable = compare,
     op_args = [{VALUE}, c]
     dag = dag
     )

sum >> compare

我希望{VALUE}是执行求和节点时返回的值。我如何存储和访问它?

您可以尝试通过 XCOMs:

分享状态或结果
def sum(a, b, **context):
    result = a + b 
    context['task_instance'].xcom_push(key='result_of_sum', value=result)

def compare(c, d, **context):
   result_of_sum = context['task_instance'].xcom_pull(key='result_of_sum')
   return c > d

请不要忘记provide_context=True PythonOperator 参数