使用插件导入 DAG 时出现气流错误 - 只能在 Operator 之间设置关系
Airflow error importing DAG using plugin - Relationships can only be set between Operators
我编写了一个仅包含一个自定义运算符的气流插件(以支持 BigQuery 中的 CMEK)。我可以创建一个简单的 DAG,其中包含一个使用此运算符且执行良好的任务。
但是,如果我尝试在 DAG 中创建从 DummyOperator 任务到我的自定义操作员任务的依赖项,DAG 无法在 UI 中加载并抛出以下错误,我无法理解为什么会这样正在抛出错误?
Broken DAG: [/home/airflow/gcs/dags/js_bq_custom_plugin_v2.py] Relationships can only be set between Operators; received BQCMEKOperator
到目前为止,我已经在 composer-1.4.2-airflow-1.9.0、composer-1.4.2-airflow-1.10.0 和 composer-1.4.1-airflow-1.10.0 上进行了测试。
运行 每个任务的气流测试均已完成且没有错误。
在 DAG 中单独使用它效果很好(如下所示)所以我认为插件没有任何内在的错误
import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator
default_dag_args = {
'start_date': datetime.datetime(2019,1,1),
'retries': 0
}
dag = DAG(
'js_bq_custom_plugin',
schedule_interval=None,
catchup=False,
concurrency=1,
max_active_runs=1,
default_args=default_dag_args)
run_this = BQCMEKOperator(
task_id = 'cmek_plugin_test',
sql = 'select * from ds.foo LIMIT 15',
project = 'xxx',
dataset = 'js_dev',
table = 'cmek_test10',
key = 'xxx',
dag = dag
)
而如果我引入 DummyOperator 和依赖项,则会发生错误
import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator
from airflow.operators.dummy_operator import DummyOperator
default_dag_args = {
'start_date': datetime.datetime(2019,1,1),
'retries': 0
}
dag = DAG(
'js_bq_custom_plugin_v2',
schedule_interval=None,
catchup=False,
concurrency=1,
max_active_runs=1,
default_args=default_dag_args)
etl_start = DummyOperator(task_id='etl_start', dag=dag)
extract = BQCMEKOperator(
task_id = 'extract',
sql = 'select * from foo.bar LIMIT 15',
project = 'xxx',
dataset = 'js_dev',
table = 'cmek_test5',
key = 'xxx',
dag = dag
)
etl_start.set_downstream(extract)
运算符本身很简单,我可以使用最简单的自定义运算符重现该问题,例如下面的运算符
import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class TestOperator(BaseOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(TestOperator, self).__init__(*args, **kwargs)
def execute(self, context):
logging.info("Executed by TestOperator")
在init.py
中有如下插件定义
from airflow.plugins_manager import AirflowPlugin
from test_plugin.operators.test_operator import TestOperator
class TestPlugin(AirflowPlugin):
name = "test_plugin"
operators = [TestOperator]
hooks = []
executors = []
macros = []
admin_views = []
flask_blueprints = []
menu_links = []
还查看了 models.py 中生成此错误的气流代码,它使用 isinstance(t, BaseOperator) 并且当我只是 运行 它在 python 所以我不知道发生了什么事?
for t in task_list:
if not isinstance(t, BaseOperator):
raise AirflowException(
"Relationships can only be set between "
"Operators; received {}".format(t.__class__.__name__))
composer-1.4.2 版本中引入了一个错误,我们现在已经修复了,尝试创建一个新的 Composer 环境,DAG 错误应该会消失。同时,我们还将在接下来的几天内将该修复程序自动应用到现有的 1.4.2 环境中。
我编写了一个仅包含一个自定义运算符的气流插件(以支持 BigQuery 中的 CMEK)。我可以创建一个简单的 DAG,其中包含一个使用此运算符且执行良好的任务。
但是,如果我尝试在 DAG 中创建从 DummyOperator 任务到我的自定义操作员任务的依赖项,DAG 无法在 UI 中加载并抛出以下错误,我无法理解为什么会这样正在抛出错误?
Broken DAG: [/home/airflow/gcs/dags/js_bq_custom_plugin_v2.py] Relationships can only be set between Operators; received BQCMEKOperator
到目前为止,我已经在 composer-1.4.2-airflow-1.9.0、composer-1.4.2-airflow-1.10.0 和 composer-1.4.1-airflow-1.10.0 上进行了测试。
运行 每个任务的气流测试均已完成且没有错误。
在 DAG 中单独使用它效果很好(如下所示)所以我认为插件没有任何内在的错误
import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator
default_dag_args = {
'start_date': datetime.datetime(2019,1,1),
'retries': 0
}
dag = DAG(
'js_bq_custom_plugin',
schedule_interval=None,
catchup=False,
concurrency=1,
max_active_runs=1,
default_args=default_dag_args)
run_this = BQCMEKOperator(
task_id = 'cmek_plugin_test',
sql = 'select * from ds.foo LIMIT 15',
project = 'xxx',
dataset = 'js_dev',
table = 'cmek_test10',
key = 'xxx',
dag = dag
)
而如果我引入 DummyOperator 和依赖项,则会发生错误
import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator
from airflow.operators.dummy_operator import DummyOperator
default_dag_args = {
'start_date': datetime.datetime(2019,1,1),
'retries': 0
}
dag = DAG(
'js_bq_custom_plugin_v2',
schedule_interval=None,
catchup=False,
concurrency=1,
max_active_runs=1,
default_args=default_dag_args)
etl_start = DummyOperator(task_id='etl_start', dag=dag)
extract = BQCMEKOperator(
task_id = 'extract',
sql = 'select * from foo.bar LIMIT 15',
project = 'xxx',
dataset = 'js_dev',
table = 'cmek_test5',
key = 'xxx',
dag = dag
)
etl_start.set_downstream(extract)
运算符本身很简单,我可以使用最简单的自定义运算符重现该问题,例如下面的运算符
import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class TestOperator(BaseOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(TestOperator, self).__init__(*args, **kwargs)
def execute(self, context):
logging.info("Executed by TestOperator")
在init.py
中有如下插件定义from airflow.plugins_manager import AirflowPlugin
from test_plugin.operators.test_operator import TestOperator
class TestPlugin(AirflowPlugin):
name = "test_plugin"
operators = [TestOperator]
hooks = []
executors = []
macros = []
admin_views = []
flask_blueprints = []
menu_links = []
还查看了 models.py 中生成此错误的气流代码,它使用 isinstance(t, BaseOperator) 并且当我只是 运行 它在 python 所以我不知道发生了什么事?
for t in task_list:
if not isinstance(t, BaseOperator):
raise AirflowException(
"Relationships can only be set between "
"Operators; received {}".format(t.__class__.__name__))
composer-1.4.2 版本中引入了一个错误,我们现在已经修复了,尝试创建一个新的 Composer 环境,DAG 错误应该会消失。同时,我们还将在接下来的几天内将该修复程序自动应用到现有的 1.4.2 环境中。