Airflow:如何在函数中使用触发器参数
Airflow: how to use trigger parameters in functions
我们正在使用 Airflow 的 KubernetesPodOperator
作为我们的数据管道。我们想要添加的是通过 UI.
传递参数的选项
我们目前使用它的方式是我们有不同的 yaml 文件存储运算符的参数,而不是直接调用运算符,我们调用一个函数来做一些准备和 returns像这样的运算符:
def prep_kubernetes_pod_operator(yaml):
# ... read yaml and extract params
return KubernetesPodOperator(params)
with DAG(...):
task1 = prep_kubernetes_pod_operator(yaml)
对我们来说这很有效,我们可以让我们的 dag 文件非常轻量级,但是现在我们想添加我们可以通过 UI 添加一些额外参数的功能。我知道触发器参数可以通过 kwargs['dag_run'].conf
访问,但我没有成功地将它们拉入 Python 函数。
我尝试的另一件事是创建一个自定义运算符,因为它可以识别 args,但我无法在执行部分调用 KubernetesPodOperator
(我想在运算符中调用运算符是无论如何都不是正确的解决方案。
更新:
听从 NicoE 的建议,我开始扩展 KubernetesPodOperator
。
我现在遇到的错误是,当我解析 yaml 并在之后分配参数时,父参数变成元组并引发类型错误。
达格:
task = NewKPO(
task_id="task1",
yaml_path=yaml_path)
操作员:
class NewKPO(KubernetesPodOperator):
@apply_defaults
def __init__(
self,
yaml_path: str,
name: str = "default",
*args,
**kwargs) -> None:
self.yaml_path = yaml_path
self.name = name
super(NewKPO, self).__init__(
name=name, # DAG is not parsed without this line - 'key has to be string'
*args,
**kwargs)
def execute(self, context):
# parsing yaml and adding context["dag_run"].conf (...)
self.name = yaml.name
self.image = yaml.image
self.secrets = yaml.secrets
#(...) if i run a type(self.secrets) here I will get tuple
return super(NewKPO, self).execute(context)
您可以使用 params
,这是一个可以在 DAG 级别参数定义的字典,并且在每个任务中都可以访问。适用于从 BaseOperator
派生的每个运算符,也可以从 UI.
设置
以下示例显示了如何将其与不同的运算符一起使用。 params
可以在 default_args
字典中定义或作为 DAG 对象的参数。
default_args = {
"owner": "airflow",
'params': {
"param1": "first_param",
"param2": "second_param"
}
}
dag = DAG(
dag_id="example_dag_params",
default_args=default_args,
start_date=days_ago(1),
schedule_interval="@once",
tags=['example_dags'],
catchup=False
)
从 UI 触发此 DAG 时,您可以添加一个额外的参数:
可以在模板化字段中访问参数,如 BashOperator
案例:
with dag:
bash_task = BashOperator(
task_id='bash_task',
bash_command='echo bash_task: {{ params.param1 }}')
bash_task
日志输出:
{bash.py:158} INFO - Running command: echo bash_task: first_param
{bash.py:169} INFO - Output:
{bash.py:173} INFO - bash_task: first_param
{bash.py:177} INFO - Command exited with return code 0
参数可在执行上下文中访问,如 python_callable
:
def _print_params(**kwargs):
print(f"Task_id: {kwargs['ti'].task_id}")
for k, v in kwargs['params'].items():
print(f"{k}:{v}")
python_task = PythonOperator(
task_id='python_task',
python_callable=_print_params,
)
输出:
{logging_mixin.py:104} INFO - Task_id: python_task
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI
您还可以在任务级别定义中添加参数:
python_task_2 = PythonOperator(
task_id='python_task_2',
python_callable=_print_params,
params={'param4': 'param defined at task level'}
)
输出:
{logging_mixin.py:104} INFO - Task_id: python_task_2
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param4:param defined at task level
{logging_mixin.py:104} INFO - param3:param_from_the_UI
按照示例,您可以定义一个继承自 BaseOperator
:
的自定义运算符
class CustomDummyOperator(BaseOperator):
@apply_defaults
def __init__(self, custom_arg: str = 'default', *args, **kwargs) -> None:
self.custom_arg = custom_arg
super(CustomDummyOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print(f"Task_id: {self.task_id}")
for k, v in context['params'].items():
print(f"{k}:{v}")
一个示例任务是:
custom_op_task = CustomDummyOperator(
task_id='custom_operator_task'
)
输出:
{logging_mixin.py:104} INFO - Task_id: custom_operator_task
{logging_mixin.py:104} INFO - custom_arg: default
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI
进口:
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.models import BaseOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.utils.decorators import apply_defaults
希望对你有用!
我们正在使用 Airflow 的 KubernetesPodOperator
作为我们的数据管道。我们想要添加的是通过 UI.
我们目前使用它的方式是我们有不同的 yaml 文件存储运算符的参数,而不是直接调用运算符,我们调用一个函数来做一些准备和 returns像这样的运算符:
def prep_kubernetes_pod_operator(yaml):
# ... read yaml and extract params
return KubernetesPodOperator(params)
with DAG(...):
task1 = prep_kubernetes_pod_operator(yaml)
对我们来说这很有效,我们可以让我们的 dag 文件非常轻量级,但是现在我们想添加我们可以通过 UI 添加一些额外参数的功能。我知道触发器参数可以通过 kwargs['dag_run'].conf
访问,但我没有成功地将它们拉入 Python 函数。
我尝试的另一件事是创建一个自定义运算符,因为它可以识别 args,但我无法在执行部分调用 KubernetesPodOperator
(我想在运算符中调用运算符是无论如何都不是正确的解决方案。
更新:
听从 NicoE 的建议,我开始扩展 KubernetesPodOperator
。
我现在遇到的错误是,当我解析 yaml 并在之后分配参数时,父参数变成元组并引发类型错误。
达格:
task = NewKPO(
task_id="task1",
yaml_path=yaml_path)
操作员:
class NewKPO(KubernetesPodOperator):
@apply_defaults
def __init__(
self,
yaml_path: str,
name: str = "default",
*args,
**kwargs) -> None:
self.yaml_path = yaml_path
self.name = name
super(NewKPO, self).__init__(
name=name, # DAG is not parsed without this line - 'key has to be string'
*args,
**kwargs)
def execute(self, context):
# parsing yaml and adding context["dag_run"].conf (...)
self.name = yaml.name
self.image = yaml.image
self.secrets = yaml.secrets
#(...) if i run a type(self.secrets) here I will get tuple
return super(NewKPO, self).execute(context)
您可以使用 params
,这是一个可以在 DAG 级别参数定义的字典,并且在每个任务中都可以访问。适用于从 BaseOperator
派生的每个运算符,也可以从 UI.
以下示例显示了如何将其与不同的运算符一起使用。 params
可以在 default_args
字典中定义或作为 DAG 对象的参数。
default_args = {
"owner": "airflow",
'params': {
"param1": "first_param",
"param2": "second_param"
}
}
dag = DAG(
dag_id="example_dag_params",
default_args=default_args,
start_date=days_ago(1),
schedule_interval="@once",
tags=['example_dags'],
catchup=False
)
从 UI 触发此 DAG 时,您可以添加一个额外的参数:
可以在模板化字段中访问参数,如 BashOperator
案例:
with dag:
bash_task = BashOperator(
task_id='bash_task',
bash_command='echo bash_task: {{ params.param1 }}')
bash_task
日志输出:
{bash.py:158} INFO - Running command: echo bash_task: first_param
{bash.py:169} INFO - Output:
{bash.py:173} INFO - bash_task: first_param
{bash.py:177} INFO - Command exited with return code 0
参数可在执行上下文中访问,如 python_callable
:
def _print_params(**kwargs):
print(f"Task_id: {kwargs['ti'].task_id}")
for k, v in kwargs['params'].items():
print(f"{k}:{v}")
python_task = PythonOperator(
task_id='python_task',
python_callable=_print_params,
)
输出:
{logging_mixin.py:104} INFO - Task_id: python_task
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI
您还可以在任务级别定义中添加参数:
python_task_2 = PythonOperator(
task_id='python_task_2',
python_callable=_print_params,
params={'param4': 'param defined at task level'}
)
输出:
{logging_mixin.py:104} INFO - Task_id: python_task_2
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param4:param defined at task level
{logging_mixin.py:104} INFO - param3:param_from_the_UI
按照示例,您可以定义一个继承自 BaseOperator
:
class CustomDummyOperator(BaseOperator):
@apply_defaults
def __init__(self, custom_arg: str = 'default', *args, **kwargs) -> None:
self.custom_arg = custom_arg
super(CustomDummyOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print(f"Task_id: {self.task_id}")
for k, v in context['params'].items():
print(f"{k}:{v}")
一个示例任务是:
custom_op_task = CustomDummyOperator(
task_id='custom_operator_task'
)
输出:
{logging_mixin.py:104} INFO - Task_id: custom_operator_task
{logging_mixin.py:104} INFO - custom_arg: default
{logging_mixin.py:104} INFO - param1:first_param
{logging_mixin.py:104} INFO - param2:second_param
{logging_mixin.py:104} INFO - param3:param_from_the_UI
进口:
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.models import BaseOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.utils.decorators import apply_defaults
希望对你有用!