气流错误 log_id 格式

Airflow wrong log_id format

我在 Kubernetes 中使用 Airflow v2.2.3 和 apache-airflow-providers-elasticsearch==2.1.0、运行。

我们的日志会自动传送到 Elasticsearch v7.6.2。

我在Airflow中设置了以下配置:

AIRFLOW__LOGGING__REMOTE_LOGGING=True
AIRFLOW__ELASTICSEARCH__HOST=<my-elasticsearch-host>:9200
AIRFLOW__ELASTICSEARCH__WRITE_STDOUT=True
AIRFLOW__ELASTICSEARCH__JSON_FORMAT=True
AIRFLOW__ELASTICSEARCH__LOG_ID_TEMPLATE={dag_id}-{task_id}-{execution_date}-{try_number}

我在标准输出中看到日志为 json:

{
    "asctime": "2022-01-20 12:13:52,292",
    "filename": "taskinstance.py",
    "lineno": 1032,
    "levelname": "INFO",
    "message": "Dependencies all met for <TaskInstance: spark_jobs_8765280.check_rawevents_output scheduled__2022-01-19T07:00:00+00:00 [queued]>", 
    "offset": 1642680832292384000,
    "dag_id": "spark_jobs_8765280",
    "task_id": "check_raweven ts_output",
    "execution_date": "2022_01_19T07_00_00_000000",
    "try_number": "1",
    "log_id": "spark_jobs_8765280-check_rawevents_output-2022_01_19T07_00_00_000000-1"
}

而且我确实在 Elasticsearch 中也看到了这些字段 - 到目前为止还不错。 现在我期待通过 Airflow 的网络服务器看到这些日志 UI 但什么也没发生(气流无法检索日志)。

当我尝试手动更新日志文档并将“log_id”的格式更改为:

"log_id": "spark_jobs_8765280-check_rawevents_output-2022_01_19T07_00_00_000000-1"

"log_id": "spark_jobs_8765280-check_rawevents_output-2022-01-19T07:00:00+00:00-1"

DO 查看 UI!

中的日志

为什么 Airflow 的 UI 尝试使用非转义 log_id 而不是正确的 log_id 来获取?

P.S - 这是 Airflow Web 服务器中的日志:

[2022-01-20 14:08:21,170] {base.py:270} INFO - POST http://<my-elasticsearch-host>:9200/_count [status:200 request:0.008s]
10.1.19.65 - - [20/Jan/2022:14:08:21 +0000] "GET /get_logs_with_metadata?dag_id=spark_jobs_8765280&task_id=check_rawevents_output&execution_date=2022-01-19T07%3A00%3A00%2B00%3A00&try_number=1&metadata=%7B%22end_of_log%22%3Afalse%2C%22last_log_timestamp%22%3A%222022-01-20T14%3A05%3A55.439492%2B00%3A00%22%2C%22offset%22%3A%221691%22%7D HTTP/1.1" 200 119 "https://<my-airflow-webserver-host>/log?dag_id=spark_jobs_8765280&task_id=check_rawevents_output&execution_date=2022-01-19T07%3A00%3A00%2B00%3A00" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"

我发现了问题。我有一个单独的 pod 用于 airflow-scheduler 和 airflow-webserver。

我只将 AIRFLOW__ELASTICSEARCH__JSON_FORMAT=True 添加到 airflow-scheduler + workers 但没有添加到 airflow-webserver。

我深入研究了源代码,发现网络服务器 checks for the AIRFLOW__ELASTICSEARCH__JSON_FORMAT as well in order to transform log_id 为了将日期清理为正确的格式:

if self.json_format:
    data_interval_start = self._clean_date(dag_run.data_interval_start)
    data_interval_end = self._clean_date(dag_run.data_interval_end)
    execution_date = self._clean_date(dag_run.execution_date)
else:
    data_interval_start = dag_run.data_interval_start.isoformat()
    data_interval_end = dag_run.data_interval_end.isoformat()
    execution_date = dag_run.execution_date.isoformat()