未按预期读取气流宏
Airflow macro not being read as expected
我需要一个 Airflow 宏值,但返回的字符串没有按预期读取,我得到的只是一个损坏的 DAG。我已经在终端测试了部分脚本,看看是否有问题,但似乎并非如此。
我的期望是像“2016-06-28T16:51:45.978473-05:00”这样的字符串变成“2016-06-28T16:51”
这是代码。这部分出现在 DAG 装饰器范围 with DAG(..) as dag:
.
之前
exec_date = '{{ execution_date }}'
exec_date = re.findall(r"^[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}", exec_date)[0]
错误信息:
Broken DAG: [<path-to-dag>/processing_dag.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "<path-to-dag>/processing_dag.py", line 16, in <module>
exec_date = re.findall(r"^[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}", exec_date)[0]
IndexError: list index out of range
这意味着我没有以 Airflow docs 指定的格式获得 '{{ execution_date }}'
。
运行 Airflow 服务器的 DAG 脚本不会激活宏并且 DAG 已损坏,所以我不知道如何调试代码。有没有办法打印 '{{ execution_date }}'
的值,以便我了解发生了什么?
[编辑] 根据要求,这里是脚本的一些相关部分。导入的模块是:
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datamechanics_airflow_plugin.operator import DataMechanicsOperator
from airflow.models import Variable
from airflow.utils.dates import days_ago
from datetime import datetime
import pendulum
import re
脚本顶部:
local_tz = pendulum.timezone("America/Sao_Paulo")
exec_date = '{{ execution_date }}'
exec_date = re.findall(r"^[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}", exec_date)[0]
exec_date = datetime.strptime(exec_date, "%Y-%m-%dT%H:%M")
with DAG(
dag_id="processing_dag",
start_date=days_ago(0, second = 1).astimezone(tz=local_tz),
schedule_interval="@daily",
) as dag:
<tasks, etc>...
Jinja 模板化字符串或宏直到 task/DAG 运行时才会被评估。然而,顶级代码(也就是出现在运算符 execute()
方法之外的逻辑)在调度程序的每个文件解析间隔执行。
这里发生的是这两行代码正在评估 {{ execution_date }}
的 文字字符串 而不是您期望的日期时间字符串:
exec_date = re.findall(r"^[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}", exec_date)[0]
exec_date = datetime.strptime(exec_date, "%Y-%m-%dT%H:%M")
抛出的错误是由调度程序解析 DAG 文件并执行顶级代码引起的。由于正则表达式没有 return 结果,索引访问失败,因为在文字字符串 {{ execution_date }}
.
中没有匹配的正则表达式字符串模式
理想情况下,Jinja 表达式和相关逻辑将成为 template_field
的一部分,在运算符文档中注明,或者在被调用的运算符逻辑中。
我需要一个 Airflow 宏值,但返回的字符串没有按预期读取,我得到的只是一个损坏的 DAG。我已经在终端测试了部分脚本,看看是否有问题,但似乎并非如此。
我的期望是像“2016-06-28T16:51:45.978473-05:00”这样的字符串变成“2016-06-28T16:51”
这是代码。这部分出现在 DAG 装饰器范围 with DAG(..) as dag:
.
exec_date = '{{ execution_date }}'
exec_date = re.findall(r"^[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}", exec_date)[0]
错误信息:
Broken DAG: [<path-to-dag>/processing_dag.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "<path-to-dag>/processing_dag.py", line 16, in <module>
exec_date = re.findall(r"^[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}", exec_date)[0]
IndexError: list index out of range
这意味着我没有以 Airflow docs 指定的格式获得 '{{ execution_date }}'
。
运行 Airflow 服务器的 DAG 脚本不会激活宏并且 DAG 已损坏,所以我不知道如何调试代码。有没有办法打印 '{{ execution_date }}'
的值,以便我了解发生了什么?
[编辑] 根据要求,这里是脚本的一些相关部分。导入的模块是:
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datamechanics_airflow_plugin.operator import DataMechanicsOperator
from airflow.models import Variable
from airflow.utils.dates import days_ago
from datetime import datetime
import pendulum
import re
脚本顶部:
local_tz = pendulum.timezone("America/Sao_Paulo")
exec_date = '{{ execution_date }}'
exec_date = re.findall(r"^[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}", exec_date)[0]
exec_date = datetime.strptime(exec_date, "%Y-%m-%dT%H:%M")
with DAG(
dag_id="processing_dag",
start_date=days_ago(0, second = 1).astimezone(tz=local_tz),
schedule_interval="@daily",
) as dag:
<tasks, etc>...
Jinja 模板化字符串或宏直到 task/DAG 运行时才会被评估。然而,顶级代码(也就是出现在运算符 execute()
方法之外的逻辑)在调度程序的每个文件解析间隔执行。
这里发生的是这两行代码正在评估 {{ execution_date }}
的 文字字符串 而不是您期望的日期时间字符串:
exec_date = re.findall(r"^[\d]{4}-[\d]{2}-[\d]{2}T[\d]{2}:[\d]{2}", exec_date)[0]
exec_date = datetime.strptime(exec_date, "%Y-%m-%dT%H:%M")
抛出的错误是由调度程序解析 DAG 文件并执行顶级代码引起的。由于正则表达式没有 return 结果,索引访问失败,因为在文字字符串 {{ execution_date }}
.
理想情况下,Jinja 表达式和相关逻辑将成为 template_field
的一部分,在运算符文档中注明,或者在被调用的运算符逻辑中。