在 python class 内部气流中,值没有从一个函数传递到另一个函数

values are not getting passed from one function to another in python class inside airflow

我正在创建 python class 用于动态任务生成。对于第一个,我接受用户的输入并将该输入传递给动态任务创建部分,但我无法更新函数内部的 class 变量。在下面的代码 1st 中,我正在调用 accept_ip_cli 并期望此函数将更新 self.my_dict 变量,但是当我尝试在 test_dict_value() 中读取 self.my_dict 时,这并没有发生我我得到空字典。

class my_class:


    def __init__(self,
                 airflow_cfg, dag,
                 default_args,
                 *args,
                 **kwargs):


        self.master_config_yaml = ''
        self.my_dict = {}
        self.dag = dag
        self.default_args = default_args
        self.tasks = []
        self.config_values = []
        self.config_key_value = {}
        self.job_id = None

        self.insert_query = ""
        self.config_key = ['']

    # def read_yaml(path):
    #     with open(path, 'r') as f:
    #         df = pd.json_normalize(safe_load(f))
    #     return df

    def accept_ip_cli(self, **kwargs):
        scope = kwargs["dag_run"].conf.get("scope", "dm")
        pod = kwargs['dag_run'].conf.get('pod', 'acc')
        layer = kwargs['dag_run'].conf.get('layer', 'layer')
        pipeline_name = kwargs['dag_run'].conf.get('pipe', 'all')
        kpi = kwargs['dag_run'].conf.get('d', 'data')
        my_dict = {"scope": scope.lower().strip(), "pod": pod.lower().strip(), "layer": layer.lower().strip(),
                   "pipeline_name": pipeline_name.lower().strip(), "kpi": kpi}

        self.my_dict = my_dict
        return self.my_dict

    def test_dict_value(self, my_dictionary, **kwargs):
        print('Dict is', my_dictionary)

        return self.my_dict

    def call_accepted_ip(self):
        return PythonOperator(task_id="accept_input_from_cli",
                              provide_context=True,
                              python_callable=self.accept_ip_cli,
                              dag=self.dag)

    def test_dict(self):
        return PythonOperator(task_id="test_dict",
                              provide_context=True,
                              python_callable=self.test_dict_value,
                              dag=self.dag)


def get_json_var(AIRFLOW_VAR):
    aiflow_cfg = Variable.get(AIRFLOW_VAR, deserialize_json=True)
    return aiflow_cfg


# Function to create dags for the number of executions
def create_dag(pipeline_name):
    default_args = {
        'owner': 'Airflow'
    }
    dag_id = "recon_nik_poc2"

    dag = DAG(
        dag_id,
        default_args=default_args,
        catchup=False,
        max_active_runs=1
    )
    airflow_cfg = get_json_var(AIRFLOW_VAR)

    bq = my_class(airflow_cfg, dag=dag, default_args=default_args)
    start_flow = DummyOperator(task_id='start_flow', dag=dag)

    end_flow = DummyOperator(task_id='end_flow', dag=dag)

    start_flow >> bq.call_accepted_ip() >> bq.test_dict() >> end_flow
    return dag


pipelines_names = ['recon_nik_poc2']
for each in pipelines_names:
    dag_name = "recon_nik_poc2"
    globals()[dag_name] = create_dag(each)

我在这里缺少什么?

对于每个任务都会有一个 AuditReconciliationFW 的新实例,因此 bq.call_accepted_ip() 所做的不会传播到 bq.test_dict()。如果我正确理解您要实现的目标是将一个任务的值传递给另一个任务。为此,您必须使用 XCom,因为这是在任务之间共享状态的唯一正确方法。

要将值从一个函数传递到另一个函数,您可以使用 XCom。默认情况下,任务是隔离的,但 XCom 允许您相互通信。非常好用,简单易用