Airflow 运算符和 dags 以及正确返回、公开和访问值?
Airflow operator and dags and properly returning, exposing, and accessing values?
我需要创建一个气流运算符,它需要一些输入和 returns 一个字符串,该字符串将用作另一个运算符的输入,接下来 运行。我是 airflow dags 和操作员的新手,对如何正确执行此操作感到困惑。由于我是为使用气流和构建 dags 的人构建这个,而且我不是真正的气流用户或 dag 开发人员,所以我想获得有关正确执行它的建议。我创建了一个运算符,它 returns 是一个标记(只是一个字符串,所以 hello world 运算符示例可以正常工作)。这样做我在 dag 执行的 xcom 值中看到了值。但是我如何正确地检索该值并将其输入到下一个运算符中呢?对于我的示例,我只是调用了同一个操作员,但实际上它将调用不同的操作员。我只是不知道如何正确编码。我只是向 dag 添加代码吗?运营商是否需要添加代码?还是应该做其他事情?
示例 Dag:
import logging
import os
from airflow import DAG
from airflow.utils.dates import days_ago
from custom_operators.hello_world import HelloWorldOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
dag = DAG("hello_world_test",
description='Testing out a operator',
start_date=days_ago(1),
schedule_interval=None,
catchup=False,
default_args=default_args)
get_token = HelloWorldOperator(
task_id='hello_world_task_1',
name='My input to generate a token',
dag=dag
)
token = "My token" # Want this to be the return value from get_token
run_this = HelloWorldOperator(
task_id='hello_world_task_2',
name=token,
dag=dag
)
logging.info("Start")
get_token >> run_this
logging.info("End")
Hello World 运算符:
from airflow.models.baseoperator import BaseOperator
class HelloWorldOperator(BaseOperator):
def __init__(
self,
some_input: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.some_input = some_input
def execute(self, context):
# Bunch of business logic
token = "MyGeneratedToken"
return token
这是一个好的开始:)。
从另一个任务中检索令牌的正确方法是使用 jinja 模板
run_this = RetrieveToken(
task_id='hello_world_task_2',
retrieved_token="{{ ti.xcom_pull(task_ids=[\'hello_world_task_1\']) }}'",
dag=dag
)
但是,您必须记住在您的 RetrieveToken 中将 retrieved_token
添加到 template_fields
数组:https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html#templating
您还可以在“检索”运算符中显式调用 xcom_pull
方法,并将“原始”任务 ID 传递给运算符以从正确的任务中检索它。
我需要创建一个气流运算符,它需要一些输入和 returns 一个字符串,该字符串将用作另一个运算符的输入,接下来 运行。我是 airflow dags 和操作员的新手,对如何正确执行此操作感到困惑。由于我是为使用气流和构建 dags 的人构建这个,而且我不是真正的气流用户或 dag 开发人员,所以我想获得有关正确执行它的建议。我创建了一个运算符,它 returns 是一个标记(只是一个字符串,所以 hello world 运算符示例可以正常工作)。这样做我在 dag 执行的 xcom 值中看到了值。但是我如何正确地检索该值并将其输入到下一个运算符中呢?对于我的示例,我只是调用了同一个操作员,但实际上它将调用不同的操作员。我只是不知道如何正确编码。我只是向 dag 添加代码吗?运营商是否需要添加代码?还是应该做其他事情?
示例 Dag:
import logging
import os
from airflow import DAG
from airflow.utils.dates import days_ago
from custom_operators.hello_world import HelloWorldOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
dag = DAG("hello_world_test",
description='Testing out a operator',
start_date=days_ago(1),
schedule_interval=None,
catchup=False,
default_args=default_args)
get_token = HelloWorldOperator(
task_id='hello_world_task_1',
name='My input to generate a token',
dag=dag
)
token = "My token" # Want this to be the return value from get_token
run_this = HelloWorldOperator(
task_id='hello_world_task_2',
name=token,
dag=dag
)
logging.info("Start")
get_token >> run_this
logging.info("End")
Hello World 运算符:
from airflow.models.baseoperator import BaseOperator
class HelloWorldOperator(BaseOperator):
def __init__(
self,
some_input: str,
**kwargs) -> None:
super().__init__(**kwargs)
self.some_input = some_input
def execute(self, context):
# Bunch of business logic
token = "MyGeneratedToken"
return token
这是一个好的开始:)。
从另一个任务中检索令牌的正确方法是使用 jinja 模板
run_this = RetrieveToken(
task_id='hello_world_task_2',
retrieved_token="{{ ti.xcom_pull(task_ids=[\'hello_world_task_1\']) }}'",
dag=dag
)
但是,您必须记住在您的 RetrieveToken 中将 retrieved_token
添加到 template_fields
数组:https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html#templating
您还可以在“检索”运算符中显式调用 xcom_pull
方法,并将“原始”任务 ID 传递给运算符以从正确的任务中检索它。