如何在气流中恰好一次生成 datetime.now()
How to generate datetime.now() in airflow exactly one time
我在尝试根据 name + current date
.
生成唯一名称时遇到错误
NAME = 'name-{timestamp}'.format(timestamp=datetime.now())
但是每个任务实例都会再次生成 datetime.now()
并且每个任务的 NAME
都不同。
task1 = CustomOperator(
task_id='task-1',
name = NAME,
...
)
task2 = CustomOperator(
task_id='task-2',
name = NAME,
...
)
里面的task1
和task2
NAME
会不一样。我需要 NAME
是唯一的,并且可以从 DAG 中的任务实例进行全局访问。有什么建议吗?
预计 Airflow 任务是静态的或缓慢变化的。 Airflow 每隔 min_file_process_interval
(默认 30 秒)解析 DAG 文件 - 这意味着每 30 秒您将创建一个新任务 - 甚至可能不会 运行.
很难理解你为什么要创建这样的任务,因为你没有解释你的用例,但如果你需要动态任务,你可以这样做:
for i in range(0, 5):
DummyOperator(task_id='{0}'.format(i))
编辑: 现在您编辑了您的问题并更好地解释了。 name 参数是您的 Operator 的一些自定义参数。它可以是任何你想要的。你可以做什么:
timestamp = datetime.now()
for i in range(1, 3):
task_id = f'task_{i}'
CustomOperator(task_id=task_id, name=f'{task_id}_{str(timestamp)}')
请注意,虽然这会为您提供独特的价值,但您很难跟踪它们。一种更好、更流畅的方法是使用 {{ execution_date }}
,假设 name
是 templated 字段:
for i in range(1, 3):
task_id = f'task_{i}'
CustomOperator(task_id=task_id, name=f'{task_id}_{{{{ execution_date }}}}')
或者您可以使用 {{ task_instance_key_str }}
宏,它是格式为 {dag_id}__{task_id}__{ds_nodash}
的任务实例的唯一的、人类可读的键
for i in range(1, 3):
task_id = f'task_{i}'
CustomOperator(task_id=task_id, name='{{ task_instance_key_str }}')
正如 和其他评论者所建议的那样,您可能 不 想要这个。也许您应该在 execute
方法中生成并 logging/emitting 此信息。
尽管如此,对于跨各种领域、任务和编程语言的编程新手来说,这是一个非常常见的“陷阱”,所以我认为值得在这里展示模式,如果只是为了教学目的。
答案很简单:计算时间一次,并将其保存在变量中。而已。就这么简单。这就是变量的用途。
我还强烈建议使用 datetime.strftime
方法显式格式化时间戳,而不是依赖 str()
隐式地为您完成。盲目 str
化是另一个典型的新手错误。
最后,使用时区感知日期时间,并且使用 UTC。不要想太多。去做就对了。以后你会感谢我的。
完整示例:
from __future__ import annotations
import sys
from datetime import datetime, timezone
from typing import Any
if sys.version_info < (3, 9):
from typing import Dict
else:
Dict = dict
from airflow import DAG
from airflow.models.baseoperator import BaseOperator
# Define a custom operator
class MyOperator(BaseOperator):
"""A custom Airflow operator that doesn't really do anything."""
def __init__(self, task_id: str, name: str, **kwargs) -> None:
self.task_id = task_id
self.name = name
super().__init__(**kwargs)
def execute(self, context: Dict[str, Any]) -> int:
print(
f'Hello, I am {self.name}, '
f'and I am executing Task ID {self.task_id}.'
)
return len(context)
# Pre-compute the time, making sure to use UTC.
now = datetime.now(timezone.utc)
# Format the time, unambiguously.
now_fmt = now.strftime('%Y-%m-%d_%H:%M:%S%z')
# Another option for formatting.
# See:
# * https://docs.python.org/3/library/datetime.html#datetime.datetime.isoformat
# * https://en.wikipedia.org/wiki/ISO_8601
# * https://datatracker.ietf.org/doc/html/rfc3339
# now_fmt = now.isoformat()
# Use the formatted time string as many times as you need.
with DAG(...) as dag:
task1 = MyOperator(
task_id='task-1',
name = f'task-1_{now_fmt}',
...
)
task2 = MyOperator(
task_id='task-2',
name = f'task-2_{now_fmt},
...
)
我在尝试根据 name + current date
.
NAME = 'name-{timestamp}'.format(timestamp=datetime.now())
但是每个任务实例都会再次生成 datetime.now()
并且每个任务的 NAME
都不同。
task1 = CustomOperator(
task_id='task-1',
name = NAME,
...
)
task2 = CustomOperator(
task_id='task-2',
name = NAME,
...
)
里面的task1
和task2
NAME
会不一样。我需要 NAME
是唯一的,并且可以从 DAG 中的任务实例进行全局访问。有什么建议吗?
预计 Airflow 任务是静态的或缓慢变化的。 Airflow 每隔 min_file_process_interval
(默认 30 秒)解析 DAG 文件 - 这意味着每 30 秒您将创建一个新任务 - 甚至可能不会 运行.
很难理解你为什么要创建这样的任务,因为你没有解释你的用例,但如果你需要动态任务,你可以这样做:
for i in range(0, 5):
DummyOperator(task_id='{0}'.format(i))
编辑: 现在您编辑了您的问题并更好地解释了。 name 参数是您的 Operator 的一些自定义参数。它可以是任何你想要的。你可以做什么:
timestamp = datetime.now()
for i in range(1, 3):
task_id = f'task_{i}'
CustomOperator(task_id=task_id, name=f'{task_id}_{str(timestamp)}')
请注意,虽然这会为您提供独特的价值,但您很难跟踪它们。一种更好、更流畅的方法是使用 {{ execution_date }}
,假设 name
是 templated 字段:
for i in range(1, 3):
task_id = f'task_{i}'
CustomOperator(task_id=task_id, name=f'{task_id}_{{{{ execution_date }}}}')
或者您可以使用 {{ task_instance_key_str }}
宏,它是格式为 {dag_id}__{task_id}__{ds_nodash}
for i in range(1, 3):
task_id = f'task_{i}'
CustomOperator(task_id=task_id, name='{{ task_instance_key_str }}')
正如 execute
方法中生成并 logging/emitting 此信息。
尽管如此,对于跨各种领域、任务和编程语言的编程新手来说,这是一个非常常见的“陷阱”,所以我认为值得在这里展示模式,如果只是为了教学目的。
答案很简单:计算时间一次,并将其保存在变量中。而已。就这么简单。这就是变量的用途。
我还强烈建议使用 datetime.strftime
方法显式格式化时间戳,而不是依赖 str()
隐式地为您完成。盲目 str
化是另一个典型的新手错误。
最后,使用时区感知日期时间,并且使用 UTC。不要想太多。去做就对了。以后你会感谢我的。
完整示例:
from __future__ import annotations
import sys
from datetime import datetime, timezone
from typing import Any
if sys.version_info < (3, 9):
from typing import Dict
else:
Dict = dict
from airflow import DAG
from airflow.models.baseoperator import BaseOperator
# Define a custom operator
class MyOperator(BaseOperator):
"""A custom Airflow operator that doesn't really do anything."""
def __init__(self, task_id: str, name: str, **kwargs) -> None:
self.task_id = task_id
self.name = name
super().__init__(**kwargs)
def execute(self, context: Dict[str, Any]) -> int:
print(
f'Hello, I am {self.name}, '
f'and I am executing Task ID {self.task_id}.'
)
return len(context)
# Pre-compute the time, making sure to use UTC.
now = datetime.now(timezone.utc)
# Format the time, unambiguously.
now_fmt = now.strftime('%Y-%m-%d_%H:%M:%S%z')
# Another option for formatting.
# See:
# * https://docs.python.org/3/library/datetime.html#datetime.datetime.isoformat
# * https://en.wikipedia.org/wiki/ISO_8601
# * https://datatracker.ietf.org/doc/html/rfc3339
# now_fmt = now.isoformat()
# Use the formatted time string as many times as you need.
with DAG(...) as dag:
task1 = MyOperator(
task_id='task-1',
name = f'task-1_{now_fmt}',
...
)
task2 = MyOperator(
task_id='task-2',
name = f'task-2_{now_fmt},
...
)