如何为气流任务生成不同的 ID?

How to generate an differente ID for a task on airflow?

我正在尝试使用 @task 注释调用一个函数 N 次,但如果我尝试多次调用它,我无法使用此装饰器定义 task_id它说:

airflow.exceptions.DuplicateTaskIdFound: Task id 'my_task_group.make_request__1' has already been added to the DAG

@task
def make_request(params):
    return true

def my_first_function():
    # do stuff
    return make_request(params)

def my_second_function():
    # do stuff
    return make_request(params)

for i in range(0, 10)
    first = my_first_function()  # this will call "make_request"
    second = my_second_function()  # this will also call "make_request"

    first >> second

如何在 @task 注释上动态地“重命名”task_id

使用 @task 允许通过调用修饰函数动态生成 task_id_get_unique_task_iddocs 状态:

Generate unique task id given a DAG (or if run in a DAG context) Ids are generated by appending a unique number to the end of the original task id. Example: task_id task_id__1 task_id__2 ... task_id__20

使用此功能,无需动态“重命名”任务。在您的代码示例中,您应该修饰在循环中调用的函数。这是一个有效的 运行 版本 2.0.1 示例:

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context

default_args = {
    'owner': 'airflow',
}

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(1), catchup=False, tags=['example'])
def task_decorator_example():

    def make_request(params):
        print(f"Params: {params}")

    def _print_task_id():
        context = get_current_context()
        print(f"Result: {context['ti'].task_id}")

    @task
    def my_first_function():
        _print_task_id()
        context = get_current_context()
        return make_request(context['params'])

    @task
    def my_second_function():
        _print_task_id()
        params = {'foo': 'bar'}
        return make_request(params)

    for i in range(0, 3):
        first = my_first_function()  # this will call "make_request"
        second = my_second_function()  # this will also call "make_request"

        first >> second


example_decorated_dag = task_decorator_example()

创建此图表视图的对象:

每个任务都会打印task_idparams,合并后的日志输出是这样的:

- my_first_function
{logging_mixin.py:104} INFO - Result: my_first_function
{logging_mixin.py:104} INFO - Params: {}
- my_second_function
{logging_mixin.py:104} INFO - Result: my_second_function
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__1
{logging_mixin.py:104} INFO - Result: my_first_function__1
{logging_mixin.py:104} INFO - Params: {}
- my_second_function__1
{logging_mixin.py:104} INFO - Result: my_second_function__1
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__2
{logging_mixin.py:104} INFO - Result: my_first_function__2
{logging_mixin.py:104} INFO - Params: {}

希望对你有用!