在自定义运算符上使用 dagrun.conf

Using dagrun.conf on custom operator

我正在使用 airflow 2 stable rest API 来触发 dag。我创建了一个自定义 DAG,该 DAG 从 MySQL(2 个表)加入键输入。

并且在 API 的正文中,我必须发送一些参数,如下所示,这些参数将决定加入哪 2 个表。

conf": {"database_1":"test","table_1":"student","key_1":"id","database_2": "test","table_2": "college","key_2": "student_id"},

下面是自定义运算符的实现。

from typing import Dict, Iterable, Mapping, Optional, Union

from airflow.models.baseoperator import BaseOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.utils.decorators import apply_defaults

class MySqlJoinOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            *,
            mysql_conn_id: str = 'mysql_default',
            parameters: Optional[Union[Mapping, Iterable]] = None,
            autocommit: bool = False,
            database_1: str = None,
            table_1: str = None,
            key_1: str = None,
            database_2: str = None,
            table_2: str = None,
            key_2: str = None,
            how: str = 'inner',
            **kwargs,) -> None:
            super().__init__(**kwargs)
            self.mysql_conn_id = mysql_conn_id
            self.autocommit = autocommit
            self.parameters = parameters
            self.database_1 = database_1
            self.table_1 = table_1
            self.key_1 = key_1
            self.database_2 = database_2
            self.table_2 = table_2
            self.key_2= key_2
            self.how = how

    def execute(self, context: Dict) -> None:
        self.log.info('Joining  {}.{} and {}.{}'.format(self.database_1,self.table_1,self.database_2,self.table_2))
        # hook_1 = MySqlHook(mysql_conn_id=self.mysql_conn_id, schema=self.database_1)
        # hook_2 = MySqlHook(mysql_conn_id=self.mysql_conn_id, schema=self.database_2)
        # sql_1 = "select * from {}".format(self.table_1)
        # sql_2 = "select * from {}".format(self.table_2)
        # dataframe_1 = hook_1.get_pandas_df(sql_1)
        # dataframe_2 = hook_2.get_pandas_df(sql_2)
        # resultant_dataframe = dataframe_1.join(dataframe_2,how=self.how)
        hook = MySqlHook(mysql_conn_id = self.mysql_conn_id)
        sql = "select * from {}.{} as table_1 join {}.{} as table_2 on table_1.{} = table_2.{}".format(self.database_1, self.table_1, self.database_2, self.table_2, self.key_1, self.key_2)
        resultant_dataframe = hook.get_records(sql)
        print(resultant_dataframe)
        return resultant_dataframe

而且会这样使用

from airflow import DAG
from airflow. utils.dates import days_ago

from mysql_join import MySqlJoinOperator

dag = DAG(
    dag_id='test_mysql_join',
    schedule_interval=None,
    start_date=days_ago(10),
    tags=['test mysql join'],
)

test_mysql_operator = MySqlJoinOperator(
    task_id='join_test',
    mysql_conn_id = "root_MYSQL_connection",
    database_1= "{{ dag_run.conf['database_1'] }}",
    table_1="{{ dag_run.conf['table_1'] }}",
    key_1="{{ dag_run.conf['key_1'] }}",
    database_2= "{{ dag_run.conf['database_2'] }}",
    table_2 = "{{ dag_run.conf['table_2'] }}",
    key_2 = "{{ dag_run.conf['key_2'] }}",
    dag=dag)

但是 jinja 模板在这里不起作用。 谁能帮我实现这个目标?

检查自定义运算符中模板字段的气流文档here。我相信您只需要将这些字段添加到 template_fields.