自定义气流操作员:是否可以覆盖 __new__ 方法?
Custom Airflow Operator: Is it possible to overwrite __new__ method?
我恳请您帮助解决以下问题:
我尝试创建一个自定义 Airflow 运算符,它应该能够在 DAG 执行期间动态更改其配置(在我的例子中:它继承自的运算符)。为此,我试图覆盖 new 方法来修改必要的 class 属性。不幸的是,在 Airflow 中执行时,似乎没有参数传递给 new。这使我的实现变得不可能(我需要参数来检查条件)。
这是一个测试示例:
import logging
from datetime import datetime
from airflow import DAG
from airflow.models import BaseOperator
logger = logging.getLogger(__name__)
class TestOperator(BaseOperator):
def __new__(cls, *args, **kwargs):
logger.info("in new")
logger.info(f"kwargs: {kwargs}")
logger.info(f"args: {args}")
return super().__new__(cls)
def __init__(self, *args, **kwargs):
logger.info("in init")
super().__init__(*args, **kwargs)
def execute(self, context):
logger.info("in execute")
dag = DAG(
dag_id="d_dev__test__new_operator",
concurrency=1,
max_active_runs=1,
description="New Test",
schedule_interval=None,
start_date=datetime.now()
)
t1 = TestOperator(
dag=dag,
task_id="t_dev_1",
)
它产生以下输出:
[2021-09-15 08:43:17,140] {logging_mixin.py:104} INFO - Running <TaskInstance: d_dev__test__new_operator.t_dev_1 2021-09-15T08:43:00.506224+00:00 [running]> on host ddevtestnewoperatortdev1.63d7362bda96438e922e905320e847ce
[2021-09-15 08:43:17,199] {test_dummy_dag.py:16} INFO - in new
[2021-09-15 08:43:17,199] {test_dummy_dag.py:17} INFO - kwargs: {}
[2021-09-15 08:43:17,199] {test_dummy_dag.py:18} INFO - args: ()
[2021-09-15 08:43:17,829] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=d_dev__test__new_operator
AIRFLOW_CTX_TASK_ID=t_dev_1
AIRFLOW_CTX_EXECUTION_DATE=2021-09-15T08:43:00.506224+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-09-15T08:43:00.506224+00:00
[2021-09-15 08:43:17,830] {test_dummy_dag.py:26} INFO - in execute
[2021-09-15 08:43:17,861] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=d_dev__test__new_operator, task_id=t_dev_1, execution_date=20210915T084300, start_date=20210915T084316, end_date=20210915T084317
[2021-09-15 08:43:17,924] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
如上所见,kwargs 的输出为空。我本来希望这是根据“标准”Python 行为工作的,其中来自构造函数的参数被传递到 new 方法。
我的问题是:
- 有谁知道如何让它工作?
- 如果不是,有谁知道为什么 not/cannot 有效?
- 令我感到奇怪的是 init 方法没有输出(应该是:“in init”),因为它肯定会在某个时候被调用,对吧?
在此先感谢您的帮助!
在 LocalExecutor 中执行的对象(任务)(我假设您使用的)通过多处理管道传递给 subprocessed,它在调度程序中腌制对象并在任务运行器中取消挑选它们。
这就是为什么 __new__
得到空参数而不调用 __init__
的原因 - unpickling 不知道构造函数中的参数,它将直接从对象的序列化表示中设置对象的属性.
https://docs.python.org/3/library/pickle.html#pickling-class-instances
您可以在 class 级别设置传递给 __new__
的参数(但这可能不是您想要的 - 请参阅 __getnewargs_ex__()
恐怕无能为力。
我恳请您帮助解决以下问题:
我尝试创建一个自定义 Airflow 运算符,它应该能够在 DAG 执行期间动态更改其配置(在我的例子中:它继承自的运算符)。为此,我试图覆盖 new 方法来修改必要的 class 属性。不幸的是,在 Airflow 中执行时,似乎没有参数传递给 new。这使我的实现变得不可能(我需要参数来检查条件)。
这是一个测试示例:
import logging
from datetime import datetime
from airflow import DAG
from airflow.models import BaseOperator
logger = logging.getLogger(__name__)
class TestOperator(BaseOperator):
def __new__(cls, *args, **kwargs):
logger.info("in new")
logger.info(f"kwargs: {kwargs}")
logger.info(f"args: {args}")
return super().__new__(cls)
def __init__(self, *args, **kwargs):
logger.info("in init")
super().__init__(*args, **kwargs)
def execute(self, context):
logger.info("in execute")
dag = DAG(
dag_id="d_dev__test__new_operator",
concurrency=1,
max_active_runs=1,
description="New Test",
schedule_interval=None,
start_date=datetime.now()
)
t1 = TestOperator(
dag=dag,
task_id="t_dev_1",
)
它产生以下输出:
[2021-09-15 08:43:17,140] {logging_mixin.py:104} INFO - Running <TaskInstance: d_dev__test__new_operator.t_dev_1 2021-09-15T08:43:00.506224+00:00 [running]> on host ddevtestnewoperatortdev1.63d7362bda96438e922e905320e847ce
[2021-09-15 08:43:17,199] {test_dummy_dag.py:16} INFO - in new
[2021-09-15 08:43:17,199] {test_dummy_dag.py:17} INFO - kwargs: {}
[2021-09-15 08:43:17,199] {test_dummy_dag.py:18} INFO - args: ()
[2021-09-15 08:43:17,829] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=d_dev__test__new_operator
AIRFLOW_CTX_TASK_ID=t_dev_1
AIRFLOW_CTX_EXECUTION_DATE=2021-09-15T08:43:00.506224+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-09-15T08:43:00.506224+00:00
[2021-09-15 08:43:17,830] {test_dummy_dag.py:26} INFO - in execute
[2021-09-15 08:43:17,861] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=d_dev__test__new_operator, task_id=t_dev_1, execution_date=20210915T084300, start_date=20210915T084316, end_date=20210915T084317
[2021-09-15 08:43:17,924] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
如上所见,kwargs 的输出为空。我本来希望这是根据“标准”Python 行为工作的,其中来自构造函数的参数被传递到 new 方法。
我的问题是:
- 有谁知道如何让它工作?
- 如果不是,有谁知道为什么 not/cannot 有效?
- 令我感到奇怪的是 init 方法没有输出(应该是:“in init”),因为它肯定会在某个时候被调用,对吧?
在此先感谢您的帮助!
在 LocalExecutor 中执行的对象(任务)(我假设您使用的)通过多处理管道传递给 subprocessed,它在调度程序中腌制对象并在任务运行器中取消挑选它们。
这就是为什么 __new__
得到空参数而不调用 __init__
的原因 - unpickling 不知道构造函数中的参数,它将直接从对象的序列化表示中设置对象的属性.
https://docs.python.org/3/library/pickle.html#pickling-class-instances
您可以在 class 级别设置传递给 __new__
的参数(但这可能不是您想要的 - 请参阅 __getnewargs_ex__()
恐怕无能为力。