传递给变量的日期时间被视为字符串 - Python
Datetime passed to a variable is treated as a string - Python
我正在研究 Airflow DAG。我有两个 DAG,比如 DAG A 和 DAG B。我想根据 DAG A 的 execution_date 在 DAG B 中做一些事情。为此,我正在使用 Airflow 变量。
在 DAG A 中:
def set_execution_date(**kwargs):
Variable.set('var_name',kwargs['execution_date'])
status = Variable.get(time_status)
这会使用 DAG A 的 execution_date 更新我的气流变量,如下所示:
现在我在 DAG B 中使用 airflow 变量的这个值:
def check_task_status(**kwargs):
date= Variable.get('stream_execution_date')
ti = get_task_instance('STREAMING_TEST', 'start_group', date)
我的 DAG B 针对上述定义抛出以下错误:
airflow 变量中存储的日期时间值在 DAG B 中使用时被视为字符串。相反,我想将其用作日期时间值。
感谢任何帮助。
使用 Variable.set()
时的气流 always stores a string。根据您的需要,您可以获得更高的精度,但一个简单的解决方案是将 execution_date
datetime 转换为 isoformat,然后在获取值时将其转换回 datetime。代码将如下所示:
# Import section
from datetime import datetime
# Functions section
def set_execution_date(**kwargs):
Variable.set('stream_execution_date', kwargs['execution_date'].isoformat())
status = Variable.get(time_status)
def check_task_status(**kwargs):
date = datetime.fromisoformat(Variable.get('stream_execution_date'))
ti = get_task_instance('STREAMING_TEST', 'start_group', date)
我正在研究 Airflow DAG。我有两个 DAG,比如 DAG A 和 DAG B。我想根据 DAG A 的 execution_date 在 DAG B 中做一些事情。为此,我正在使用 Airflow 变量。
在 DAG A 中:
def set_execution_date(**kwargs):
Variable.set('var_name',kwargs['execution_date'])
status = Variable.get(time_status)
这会使用 DAG A 的 execution_date 更新我的气流变量,如下所示:
现在我在 DAG B 中使用 airflow 变量的这个值:
def check_task_status(**kwargs):
date= Variable.get('stream_execution_date')
ti = get_task_instance('STREAMING_TEST', 'start_group', date)
我的 DAG B 针对上述定义抛出以下错误:
airflow 变量中存储的日期时间值在 DAG B 中使用时被视为字符串。相反,我想将其用作日期时间值。
感谢任何帮助。
使用 Variable.set()
时的气流 always stores a string。根据您的需要,您可以获得更高的精度,但一个简单的解决方案是将 execution_date
datetime 转换为 isoformat,然后在获取值时将其转换回 datetime。代码将如下所示:
# Import section
from datetime import datetime
# Functions section
def set_execution_date(**kwargs):
Variable.set('stream_execution_date', kwargs['execution_date'].isoformat())
status = Variable.get(time_status)
def check_task_status(**kwargs):
date = datetime.fromisoformat(Variable.get('stream_execution_date'))
ti = get_task_instance('STREAMING_TEST', 'start_group', date)