airflow.exceptions.AirflowException: 找不到Dag;它要么不存在,要么解析失败
airflow.exceptions.AirflowException: Dag could not be found; either it does not exist or it failed to parse
按照 https://airflow.apache.org/docs/apache-airflow/stable/upgrading-from-1-10/index.html 中给出的步骤,我最近将 Airflow 从 1.10.11 升级到 2.2.3。我首先按照建议升级到 1.10.15,效果很好。但是在升级到 2.2.3 之后,我无法从 UI 执行 DAG,因为任务正在进入排队状态。当我检查任务 pod 日志时,我看到这个错误:
[2022-02-22 06:46:23,886] {cli_action_loggers.py:105} WARNING - Failed to log action with (sqlite3.OperationalError) no such table: log
[SQL: INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) VALUES (?, ?, ?, ?, ?, ?, ?)]
[parameters: ('2022-02-22 06:46:23.880923', 'dag id', 'task id', 'cli_task_run', None, 'airflow', '{"host_name": "pod name", "full_command": "[\'/home/airflow/.local/bin/airflow\', \'tasks\', \ task id\', \'manual__2022-02-22T06:45:47.840912+00:00\', \'--local\', \'--subdir\', \'DAGS_FOLDER/dag_file.py\']"}')]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
[2022-02-22 06:46:23,888] {dagbag.py:500} INFO - Filling up the DagBag from /opt/airflow/dags/repo/xxxxx.py
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 48, in main
args.func(args)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 282, in task_run
dag = get_dag(args.subdir, args.dag_id)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 193, in get_dag
f"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse."
airflow.exceptions.AirflowException: Dag 'xxxxx' could not be found; either it does not exist or it failed to parse
我确实尝试使用“kubectl exec -it airflow-dev-webserver-6c5755d5dd-262wd -n dev --container webserver -- /bin/sh”执行到网络服务器和调度程序。我可以看到 /opt/airflow/dags/repo/ 下的所有狗。即使在错误中它说 Filling up the DagBag from /opt/airflow/dags/repo/ 但无法理解是什么使任务执行进入排队状态。
我通过以下步骤解决了这个问题:
我触发了一个 DAG,之后我可以看到任务 pod 进入错误状态。所以我做了“kubectl logs {pod_name} git-sync”来检查是否首先复制了 DAG。然后我发现了以下错误:
图片
后来才知道是DAGs文件夹写入DAG权限的问题。为此,我尝试更改“volumeMounts”部分下的“readOnly:false”。
图片
就是这样!!!有效。下面终于成功了:
Pod 模板文件:
apiVersion: v1
kind: Pod
metadata:
labels:
component: worker
release: airflow-dev
tier: airflow
spec:
containers:
- args: []
command: []
env:
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
value: ECR repo link
- name: AIRFLOW__SMTP__SMTP_PORT
value: '587'
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
value: docker image tag
- name: AIRFLOW__KUBERNETES__GIT_SYNC_RUN_AS_USER
value: '65533'
- name: AIRFLOW__CORE__ENABLE_XCOM_PICKLING
value: 'True'
- name: AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM
value: dw-airflow-dev-logs
- name: AIRFLOW__KUBERNETES__RUN_AS_USER
value: '50000'
- name: AIRFLOW__KUBERNETES__DAGS_IN_IMAGE
value: 'False'
- name: AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION
value: 'False'
- name: AIRFLOW__SMTP__SMTP_MAIL_FROM
value: email id
- name: AIRFLOW__CORE__LOAD_EXAMPLES
value: 'False'
- name: AIRFLOW__SMTP__SMTP_PASSWORD
value: xxxxxxxxx
- name: AIRFLOW__SMTP__SMTP_HOST
value: smtp-relay.gmail.com
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: dev
- name: AIRFLOW__SMTP__SMTP_USER
value: xxxxxxxxxx
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
- name: AIRFLOW_HOME
value: /opt/airflow
- name: AIRFLOW__CORE__DAGS_FOLDER
value: /opt/airflow/dags
- name: AIRFLOW__KUBERNETES__GIT_DAGS_FOLDER_MOUNT_POINT
value: /opt/airflow/dags
- name: AIRFLOW__KUBERNETES__FS_GROUP
value: "50000"
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
key: connection
name: airflow-dev-airflow-metadata
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
key: connection
name: airflow-dev-airflow-metadata
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
key: fernet-key
name: airflow-dev-fernet-key
envFrom: []
image: docker image
imagePullPolicy: IfNotPresent
name: base
ports: []
volumeMounts:
- mountPath: /opt/airflow/dags
name: airflow-dags
readOnly: false
subPath: /repo
- mountPath: /opt/airflow/logs
name: airflow-logs
- mountPath: /etc/git-secret/ssh
name: git-sync-ssh-key
subPath: ssh
- mountPath: /opt/airflow/airflow.cfg
name: airflow-config
readOnly: true
subPath: airflow.cfg
- mountPath: /opt/airflow/config/airflow_local_settings.py
name: airflow-config
readOnly: true
subPath: airflow_local_settings.py
hostNetwork: false
imagePullSecrets:
- name: airflow-dev-registry
initContainers:
- env:
- name: GIT_SYNC_REPO
value: xxxxxxxxxxxxx
- name: GIT_SYNC_BRANCH
value: master
- name: GIT_SYNC_ROOT
value: /git
- name: GIT_SYNC_DEST
value: repo
- name: GIT_SYNC_DEPTH
value: '1'
- name: GIT_SYNC_ONE_TIME
value: 'true'
- name: GIT_SYNC_REV
value: HEAD
- name: GIT_SSH_KEY_FILE
value: /etc/git-secret/ssh
- name: GIT_SYNC_ADD_USER
value: 'true'
- name: GIT_SYNC_SSH
value: 'true'
- name: GIT_KNOWN_HOSTS
value: 'false'
image: k8s.gcr.io/git-sync:v3.1.6
name: git-sync
securityContext:
runAsUser: 65533
volumeMounts:
- mountPath: /git
name: airflow-dags
readOnly: false
- mountPath: /etc/git-secret/ssh
name: git-sync-ssh-key
subPath: ssh
nodeSelector: {}
restartPolicy: Never
securityContext:
fsGroup: 50000
runAsUser: 50000
serviceAccountName: airflow-dev-worker-serviceaccount
volumes:
- emptyDir: {}
name: airflow-dags
- name: airflow-logs
persistentVolumeClaim:
claimName: dw-airflow-dev-logs
- name: git-sync-ssh-key
secret:
items:
- key: gitSshKey
mode: `444`
path: ssh
secretName: airflow-private-dags-dev
- configMap:
name: airflow-dev-airflow-config
name: airflow-config [](url)
按照 https://airflow.apache.org/docs/apache-airflow/stable/upgrading-from-1-10/index.html 中给出的步骤,我最近将 Airflow 从 1.10.11 升级到 2.2.3。我首先按照建议升级到 1.10.15,效果很好。但是在升级到 2.2.3 之后,我无法从 UI 执行 DAG,因为任务正在进入排队状态。当我检查任务 pod 日志时,我看到这个错误:
[2022-02-22 06:46:23,886] {cli_action_loggers.py:105} WARNING - Failed to log action with (sqlite3.OperationalError) no such table: log
[SQL: INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) VALUES (?, ?, ?, ?, ?, ?, ?)]
[parameters: ('2022-02-22 06:46:23.880923', 'dag id', 'task id', 'cli_task_run', None, 'airflow', '{"host_name": "pod name", "full_command": "[\'/home/airflow/.local/bin/airflow\', \'tasks\', \ task id\', \'manual__2022-02-22T06:45:47.840912+00:00\', \'--local\', \'--subdir\', \'DAGS_FOLDER/dag_file.py\']"}')]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
[2022-02-22 06:46:23,888] {dagbag.py:500} INFO - Filling up the DagBag from /opt/airflow/dags/repo/xxxxx.py
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 48, in main
args.func(args)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 282, in task_run
dag = get_dag(args.subdir, args.dag_id)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 193, in get_dag
f"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse."
airflow.exceptions.AirflowException: Dag 'xxxxx' could not be found; either it does not exist or it failed to parse
我确实尝试使用“kubectl exec -it airflow-dev-webserver-6c5755d5dd-262wd -n dev --container webserver -- /bin/sh”执行到网络服务器和调度程序。我可以看到 /opt/airflow/dags/repo/ 下的所有狗。即使在错误中它说 Filling up the DagBag from /opt/airflow/dags/repo/ 但无法理解是什么使任务执行进入排队状态。
我通过以下步骤解决了这个问题:
我触发了一个 DAG,之后我可以看到任务 pod 进入错误状态。所以我做了“kubectl logs {pod_name} git-sync”来检查是否首先复制了 DAG。然后我发现了以下错误: 图片
后来才知道是DAGs文件夹写入DAG权限的问题。为此,我尝试更改“volumeMounts”部分下的“readOnly:false”。 图片
就是这样!!!有效。下面终于成功了:
Pod 模板文件:
apiVersion: v1
kind: Pod
metadata:
labels:
component: worker
release: airflow-dev
tier: airflow
spec:
containers:
- args: []
command: []
env:
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
value: ECR repo link
- name: AIRFLOW__SMTP__SMTP_PORT
value: '587'
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
value: docker image tag
- name: AIRFLOW__KUBERNETES__GIT_SYNC_RUN_AS_USER
value: '65533'
- name: AIRFLOW__CORE__ENABLE_XCOM_PICKLING
value: 'True'
- name: AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM
value: dw-airflow-dev-logs
- name: AIRFLOW__KUBERNETES__RUN_AS_USER
value: '50000'
- name: AIRFLOW__KUBERNETES__DAGS_IN_IMAGE
value: 'False'
- name: AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION
value: 'False'
- name: AIRFLOW__SMTP__SMTP_MAIL_FROM
value: email id
- name: AIRFLOW__CORE__LOAD_EXAMPLES
value: 'False'
- name: AIRFLOW__SMTP__SMTP_PASSWORD
value: xxxxxxxxx
- name: AIRFLOW__SMTP__SMTP_HOST
value: smtp-relay.gmail.com
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: dev
- name: AIRFLOW__SMTP__SMTP_USER
value: xxxxxxxxxx
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
- name: AIRFLOW_HOME
value: /opt/airflow
- name: AIRFLOW__CORE__DAGS_FOLDER
value: /opt/airflow/dags
- name: AIRFLOW__KUBERNETES__GIT_DAGS_FOLDER_MOUNT_POINT
value: /opt/airflow/dags
- name: AIRFLOW__KUBERNETES__FS_GROUP
value: "50000"
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
key: connection
name: airflow-dev-airflow-metadata
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
key: connection
name: airflow-dev-airflow-metadata
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
key: fernet-key
name: airflow-dev-fernet-key
envFrom: []
image: docker image
imagePullPolicy: IfNotPresent
name: base
ports: []
volumeMounts:
- mountPath: /opt/airflow/dags
name: airflow-dags
readOnly: false
subPath: /repo
- mountPath: /opt/airflow/logs
name: airflow-logs
- mountPath: /etc/git-secret/ssh
name: git-sync-ssh-key
subPath: ssh
- mountPath: /opt/airflow/airflow.cfg
name: airflow-config
readOnly: true
subPath: airflow.cfg
- mountPath: /opt/airflow/config/airflow_local_settings.py
name: airflow-config
readOnly: true
subPath: airflow_local_settings.py
hostNetwork: false
imagePullSecrets:
- name: airflow-dev-registry
initContainers:
- env:
- name: GIT_SYNC_REPO
value: xxxxxxxxxxxxx
- name: GIT_SYNC_BRANCH
value: master
- name: GIT_SYNC_ROOT
value: /git
- name: GIT_SYNC_DEST
value: repo
- name: GIT_SYNC_DEPTH
value: '1'
- name: GIT_SYNC_ONE_TIME
value: 'true'
- name: GIT_SYNC_REV
value: HEAD
- name: GIT_SSH_KEY_FILE
value: /etc/git-secret/ssh
- name: GIT_SYNC_ADD_USER
value: 'true'
- name: GIT_SYNC_SSH
value: 'true'
- name: GIT_KNOWN_HOSTS
value: 'false'
image: k8s.gcr.io/git-sync:v3.1.6
name: git-sync
securityContext:
runAsUser: 65533
volumeMounts:
- mountPath: /git
name: airflow-dags
readOnly: false
- mountPath: /etc/git-secret/ssh
name: git-sync-ssh-key
subPath: ssh
nodeSelector: {}
restartPolicy: Never
securityContext:
fsGroup: 50000
runAsUser: 50000
serviceAccountName: airflow-dev-worker-serviceaccount
volumes:
- emptyDir: {}
name: airflow-dags
- name: airflow-logs
persistentVolumeClaim:
claimName: dw-airflow-dev-logs
- name: git-sync-ssh-key
secret:
items:
- key: gitSshKey
mode: `444`
path: ssh
secretName: airflow-private-dags-dev
- configMap:
name: airflow-dev-airflow-config
name: airflow-config [](url)