自定义气流操作员:是否可以覆盖 __new__ 方法?

Custom Airflow Operator: Is it possible to overwrite __new__ method?

我恳请您帮助解决以下问题:

我尝试创建一个自定义 Airflow 运算符,它应该能够在 DAG 执行期间动态更改其配置(在我的例子中:它继承自的运算符)。为此,我试图覆盖 new 方法来修改必要的 class 属性。不幸的是,在 Airflow 中执行时,似乎没有参数传递给 new。这使我的实现变得不可能(我需要参数来检查条件)。

这是一个测试示例:

import logging
from datetime import datetime
from airflow import DAG
from airflow.models import BaseOperator

logger = logging.getLogger(__name__)


class TestOperator(BaseOperator):

    def __new__(cls, *args, **kwargs):
        logger.info("in new")
        logger.info(f"kwargs: {kwargs}")
        logger.info(f"args: {args}")
        return super().__new__(cls)

    def __init__(self, *args, **kwargs):
        logger.info("in init")
        super().__init__(*args, **kwargs)

    def execute(self, context):
        logger.info("in execute")


dag = DAG(
    dag_id="d_dev__test__new_operator",
    concurrency=1,
    max_active_runs=1,
    description="New Test",
    schedule_interval=None,
    start_date=datetime.now()
)


t1 = TestOperator(
    dag=dag,
    task_id="t_dev_1",
)

它产生以下输出:

[2021-09-15 08:43:17,140] {logging_mixin.py:104} INFO - Running <TaskInstance:   d_dev__test__new_operator.t_dev_1 2021-09-15T08:43:00.506224+00:00 [running]> on host  ddevtestnewoperatortdev1.63d7362bda96438e922e905320e847ce  
[2021-09-15 08:43:17,199] {test_dummy_dag.py:16} INFO - in new  
[2021-09-15 08:43:17,199] {test_dummy_dag.py:17} INFO - kwargs: {}  
[2021-09-15 08:43:17,199] {test_dummy_dag.py:18} INFO - args: ()  
[2021-09-15 08:43:17,829] {taskinstance.py:1302} INFO - Exporting the following env vars:  
AIRFLOW_CTX_DAG_OWNER=airflow  
AIRFLOW_CTX_DAG_ID=d_dev__test__new_operator  
AIRFLOW_CTX_TASK_ID=t_dev_1  
AIRFLOW_CTX_EXECUTION_DATE=2021-09-15T08:43:00.506224+00:00  
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-09-15T08:43:00.506224+00:00  
[2021-09-15 08:43:17,830] {test_dummy_dag.py:26} INFO - in execute  
[2021-09-15 08:43:17,861] {taskinstance.py:1211} INFO - Marking task as SUCCESS.   dag_id=d_dev__test__new_operator, task_id=t_dev_1, execution_date=20210915T084300,   start_date=20210915T084316, end_date=20210915T084317  
[2021-09-15 08:43:17,924] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check  

如上所见,kwargs 的输出为空。我本来希望这是根据“标准”Python 行为工作的,其中来自构造函数的参数被传递到 new 方法。

我的问题是:

  1. 有谁知道如何让它工作?
  2. 如果不是,有谁知道为什么 not/cannot 有效?
  3. 令我感到奇怪的是 init 方法没有输出(应该是:“in init”),因为它肯定会在某个时候被调用,对吧?

在此先感谢您的帮助!

在 LocalExecutor 中执行的对象(任务)(我假设您使用的)通过多处理管道传递给 subprocessed,它在调度程序中腌制对象并在任务运行器中取消挑选它们。

这就是为什么 __new__ 得到空参数而不调用 __init__ 的原因 - unpickling 不知道构造函数中的参数,它将直接从对象的序列化表示中设置对象的属性.

https://docs.python.org/3/library/pickle.html#pickling-class-instances

您可以在 class 级别设置传递给 __new__ 的参数(但这可能不是您想要的 - 请参阅 __getnewargs_ex__()

恐怕无能为力。