如何为气流任务生成不同的 ID?
How to generate an differente ID for a task on airflow?
我正在尝试使用 @task
注释调用一个函数 N
次,但如果我尝试多次调用它,我无法使用此装饰器定义 task_id
它说:
airflow.exceptions.DuplicateTaskIdFound: Task id 'my_task_group.make_request__1' has already been added to the DAG
@task
def make_request(params):
return true
def my_first_function():
# do stuff
return make_request(params)
def my_second_function():
# do stuff
return make_request(params)
for i in range(0, 10)
first = my_first_function() # this will call "make_request"
second = my_second_function() # this will also call "make_request"
first >> second
如何在 @task
注释上动态地“重命名”task_id
?
使用 @task
允许通过调用修饰函数动态生成 task_id
。 _get_unique_task_id
的 docs 状态:
Generate unique task id given a DAG (or if run in a DAG context)
Ids are generated by appending a unique number to the end of
the original task id.
Example:
task_id
task_id__1
task_id__2
...
task_id__20
使用此功能,无需动态“重命名”任务。在您的代码示例中,您应该修饰在循环中调用的函数。这是一个有效的 运行 版本 2.0.1 示例:
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(1), catchup=False, tags=['example'])
def task_decorator_example():
def make_request(params):
print(f"Params: {params}")
def _print_task_id():
context = get_current_context()
print(f"Result: {context['ti'].task_id}")
@task
def my_first_function():
_print_task_id()
context = get_current_context()
return make_request(context['params'])
@task
def my_second_function():
_print_task_id()
params = {'foo': 'bar'}
return make_request(params)
for i in range(0, 3):
first = my_first_function() # this will call "make_request"
second = my_second_function() # this will also call "make_request"
first >> second
example_decorated_dag = task_decorator_example()
创建此图表视图的对象:
每个任务都会打印task_id
和params
,合并后的日志输出是这样的:
- my_first_function
{logging_mixin.py:104} INFO - Result: my_first_function
{logging_mixin.py:104} INFO - Params: {}
- my_second_function
{logging_mixin.py:104} INFO - Result: my_second_function
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__1
{logging_mixin.py:104} INFO - Result: my_first_function__1
{logging_mixin.py:104} INFO - Params: {}
- my_second_function__1
{logging_mixin.py:104} INFO - Result: my_second_function__1
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__2
{logging_mixin.py:104} INFO - Result: my_first_function__2
{logging_mixin.py:104} INFO - Params: {}
希望对你有用!
我正在尝试使用 @task
注释调用一个函数 N
次,但如果我尝试多次调用它,我无法使用此装饰器定义 task_id
它说:
airflow.exceptions.DuplicateTaskIdFound: Task id 'my_task_group.make_request__1' has already been added to the DAG
@task
def make_request(params):
return true
def my_first_function():
# do stuff
return make_request(params)
def my_second_function():
# do stuff
return make_request(params)
for i in range(0, 10)
first = my_first_function() # this will call "make_request"
second = my_second_function() # this will also call "make_request"
first >> second
如何在 @task
注释上动态地“重命名”task_id
?
使用 @task
允许通过调用修饰函数动态生成 task_id
。 _get_unique_task_id
的 docs 状态:
Generate unique task id given a DAG (or if run in a DAG context) Ids are generated by appending a unique number to the end of the original task id. Example: task_id task_id__1 task_id__2 ... task_id__20
使用此功能,无需动态“重命名”任务。在您的代码示例中,您应该修饰在循环中调用的函数。这是一个有效的 运行 版本 2.0.1 示例:
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context
default_args = {
'owner': 'airflow',
}
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(1), catchup=False, tags=['example'])
def task_decorator_example():
def make_request(params):
print(f"Params: {params}")
def _print_task_id():
context = get_current_context()
print(f"Result: {context['ti'].task_id}")
@task
def my_first_function():
_print_task_id()
context = get_current_context()
return make_request(context['params'])
@task
def my_second_function():
_print_task_id()
params = {'foo': 'bar'}
return make_request(params)
for i in range(0, 3):
first = my_first_function() # this will call "make_request"
second = my_second_function() # this will also call "make_request"
first >> second
example_decorated_dag = task_decorator_example()
创建此图表视图的对象:
每个任务都会打印task_id
和params
,合并后的日志输出是这样的:
- my_first_function
{logging_mixin.py:104} INFO - Result: my_first_function
{logging_mixin.py:104} INFO - Params: {}
- my_second_function
{logging_mixin.py:104} INFO - Result: my_second_function
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__1
{logging_mixin.py:104} INFO - Result: my_first_function__1
{logging_mixin.py:104} INFO - Params: {}
- my_second_function__1
{logging_mixin.py:104} INFO - Result: my_second_function__1
{logging_mixin.py:104} INFO - Params: {'foo': 'bar'}
- my_first_function__2
{logging_mixin.py:104} INFO - Result: my_first_function__2
{logging_mixin.py:104} INFO - Params: {}
希望对你有用!