使用 XCom 在 类 之间交换数据?

Use XCom to exchange data between classes?

我有以下 DAG,它使用专用于数据预处理例程的 class 执行不同的方法:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('MARKETING_PREPROC_PATH')

if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    from table_builder import OnlineOfflinePreprocess
else:
    print('Define MARKETING_PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime.now(),
  'max_active_runs': 1,
  'concurrency': 4
}

worker = OnlineOfflinePreprocess()

DAG = DAG(
  dag_id='marketing_data_preproc',
  default_args=default_args,
  start_date=datetime.today()
)

import_online_data = PythonOperator(
  task_id='import_online_data',
  python_callable=worker.import_online_data,
  dag=DAG)

import_offline_data = PythonOperator(
  task_id='import_offline_data',
  python_callable=worker.import_offline_data,
  dag=DAG)

merge_aurum_to_sherlock = PythonOperator(
  task_id='merge_aurum_to_sherlock',
  python_callable=worker.merge_aurum_to_sherlock,
  dag=DAG)

merge_sherlock_to_aurum = PythonOperator(
   task_id='merge_sherlock_to_aurum',
   python_callable=worker.merge_sherlock_to_aurum,
   dag=DAG)

upload_au_to_sh = PythonOperator(
  task_id='upload_au_to_sh',
  python_callable=worker.upload_table,
  op_args='aurum_to_sherlock',
  dag=DAG)

upload_sh_to_au = PythonOperator(
  task_id='upload_sh_to_au',
  python_callable=worker.upload_table,
  op_args='sherlock_to_aurum',
  dag=DAG)

import_online_data >> merge_aurum_to_sherlock
import_offline_data >> merge_aurum_to_sherlock

merge_aurum_to_sherlock >> merge_sherlock_to_aurum
merge_aurum_to_sherlock >> upload_au_to_sh
merge_sherlock_to_aurum >> upload_sh_to_au

这会产生以下错误:

[2017-09-07 19:32:09,587] {base_task_runner.py:97} INFO - Subtask: AttributeError: 'OnlineOfflinePreprocess' object has no attribute 'online_info'

考虑到气流的工作原理,这实际上非常明显:调用的不同 class 方法的输出不会存储到在图表顶部初始化的全局 class 对象中。

我可以用 XCom 解决这个问题吗?总的来说,关于如何将 OOP 的连贯性与 Airflow 融合在一起的想法是什么?

关于气流的 OOP 问题不大,更多的是关于气流状态的问题。

需要在任务之间传递的任何状态都需要持久存储。这是因为每个 airflow 任务都是一个独立的进程(甚至可以 运行 在不同的机器上!)因此内存中的通信是不可能的。

你是对的,你可以使用 XCOM 传递这个状态(如果它很小,因为它存储在气流数据库中)。如果它很大,您可能想将它存储在其他地方,可能是文件系统或 S3 或 HDFS 或专用数据库。