airflow.exceptions.AirflowException: 找不到Dag;它要么不存在,要么解析失败

airflow.exceptions.AirflowException: Dag could not be found; either it does not exist or it failed to parse

按照 https://airflow.apache.org/docs/apache-airflow/stable/upgrading-from-1-10/index.html 中给出的步骤,我最近将 Airflow 从 1.10.11 升级到 2.2.3。我首先按照建议升级到 1.10.15,效果很好。但是在升级到 2.2.3 之后,我无法从 UI 执行 DAG,因为任务正在进入排队状态。当我检查任务 pod 日志时,我看到这个错误:

[2022-02-22 06:46:23,886] {cli_action_loggers.py:105} WARNING - Failed to log action with (sqlite3.OperationalError) no such table: log
[SQL: INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) VALUES (?, ?, ?, ?, ?, ?, ?)]
[parameters: ('2022-02-22 06:46:23.880923', 'dag id', 'task id', 'cli_task_run', None, 'airflow', '{"host_name": "pod name", "full_command": "[\'/home/airflow/.local/bin/airflow\', \'tasks\', \ task id\', \'manual__2022-02-22T06:45:47.840912+00:00\', \'--local\', \'--subdir\', \'DAGS_FOLDER/dag_file.py\']"}')]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
[2022-02-22 06:46:23,888] {dagbag.py:500} INFO - Filling up the DagBag from /opt/airflow/dags/repo/xxxxx.py
Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 48, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 282, in task_run
    dag = get_dag(args.subdir, args.dag_id)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 193, in get_dag
    f"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse."
airflow.exceptions.AirflowException: Dag 'xxxxx' could not be found; either it does not exist or it failed to parse

我确实尝试使用“kubectl exec -it airflow-dev-webserver-6c5755d5dd-262wd -n dev --container webserver -- /bin/sh”执行到网络服务器和调度程序。我可以看到 /opt/airflow/dags/repo/ 下的所有狗。即使在错误中它说 Filling up the DagBag from /opt/airflow/dags/repo/ 但无法理解是什么使任务执行进入排队状态。

我通过以下步骤解决了这个问题:

我触发了一个 DAG,之后我可以看到任务 pod 进入错误状态。所以我做了“kubectl logs {pod_name} git-sync”来检查是否首先复制了 DAG。然后我发现了以下错误: 图片

后来才知道是DAGs文件夹写入DAG权限的问题。为此,我尝试更改“volumeMounts”部分下的“readOnly:false”。 图片

就是这样!!!有效。下面终于成功了:

Pod 模板文件:

apiVersion: v1
kind: Pod
metadata:
  labels:
    component: worker
    release: airflow-dev
    tier: airflow
spec:
  containers:
  - args: []
    command: []
    env:
    - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
      value: ECR repo link
    - name: AIRFLOW__SMTP__SMTP_PORT
      value: '587'
    - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
      value: docker image tag
    - name: AIRFLOW__KUBERNETES__GIT_SYNC_RUN_AS_USER
      value: '65533'
    - name: AIRFLOW__CORE__ENABLE_XCOM_PICKLING
      value: 'True'
    - name: AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM
      value: dw-airflow-dev-logs
    - name: AIRFLOW__KUBERNETES__RUN_AS_USER
      value: '50000'
    - name: AIRFLOW__KUBERNETES__DAGS_IN_IMAGE
      value: 'False'
    - name: AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION
      value: 'False'
    - name: AIRFLOW__SMTP__SMTP_MAIL_FROM
      value: email id
    - name: AIRFLOW__CORE__LOAD_EXAMPLES
      value: 'False'
    - name: AIRFLOW__SMTP__SMTP_PASSWORD
      value: xxxxxxxxx
    - name: AIRFLOW__SMTP__SMTP_HOST
      value: smtp-relay.gmail.com
    - name: AIRFLOW__KUBERNETES__NAMESPACE
      value: dev
    - name: AIRFLOW__SMTP__SMTP_USER
      value: xxxxxxxxxx
    - name: AIRFLOW__CORE__EXECUTOR
      value: LocalExecutor
    - name: AIRFLOW_HOME
      value: /opt/airflow
    - name: AIRFLOW__CORE__DAGS_FOLDER
      value: /opt/airflow/dags
    - name: AIRFLOW__KUBERNETES__GIT_DAGS_FOLDER_MOUNT_POINT
      value: /opt/airflow/dags
    - name: AIRFLOW__KUBERNETES__FS_GROUP
      value: "50000"
    - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
      valueFrom:
        secretKeyRef:
          key: connection
          name: airflow-dev-airflow-metadata
    - name: AIRFLOW_CONN_AIRFLOW_DB
      valueFrom:
        secretKeyRef:
          key: connection
          name: airflow-dev-airflow-metadata
    - name: AIRFLOW__CORE__FERNET_KEY
      valueFrom:
        secretKeyRef:
          key: fernet-key
          name: airflow-dev-fernet-key
    envFrom: []
    image: docker image
    imagePullPolicy: IfNotPresent
    name: base
    ports: []
    volumeMounts:
    - mountPath: /opt/airflow/dags
      name: airflow-dags
      readOnly: false
      subPath: /repo
    - mountPath: /opt/airflow/logs
      name: airflow-logs
    - mountPath: /etc/git-secret/ssh
      name: git-sync-ssh-key
      subPath: ssh
    - mountPath: /opt/airflow/airflow.cfg
      name: airflow-config
      readOnly: true
      subPath: airflow.cfg
    - mountPath: /opt/airflow/config/airflow_local_settings.py
      name: airflow-config
      readOnly: true
      subPath: airflow_local_settings.py
  hostNetwork: false
  imagePullSecrets:
  - name: airflow-dev-registry
  initContainers:
  - env:
    - name: GIT_SYNC_REPO
      value: xxxxxxxxxxxxx
    - name: GIT_SYNC_BRANCH
      value: master
    - name: GIT_SYNC_ROOT
      value: /git
    - name: GIT_SYNC_DEST
      value: repo
    - name: GIT_SYNC_DEPTH
      value: '1'
    - name: GIT_SYNC_ONE_TIME
      value: 'true'
    - name: GIT_SYNC_REV
      value: HEAD
    - name: GIT_SSH_KEY_FILE
      value: /etc/git-secret/ssh
    - name: GIT_SYNC_ADD_USER
      value: 'true'
    - name: GIT_SYNC_SSH
      value: 'true'
    - name: GIT_KNOWN_HOSTS
      value: 'false'
    image: k8s.gcr.io/git-sync:v3.1.6
    name: git-sync
    securityContext:
      runAsUser: 65533
    volumeMounts:
    - mountPath: /git
      name: airflow-dags
      readOnly: false
    - mountPath: /etc/git-secret/ssh
      name: git-sync-ssh-key
      subPath: ssh
  nodeSelector: {}
  restartPolicy: Never
  securityContext:
    fsGroup: 50000
    runAsUser: 50000
  serviceAccountName: airflow-dev-worker-serviceaccount
  volumes:
  - emptyDir: {}
    name: airflow-dags
  - name: airflow-logs
    persistentVolumeClaim:
      claimName: dw-airflow-dev-logs
  - name: git-sync-ssh-key
    secret:
      items:
      - key: gitSshKey
        mode: `444`
        path: ssh
      secretName: airflow-private-dags-dev
  - configMap:
      name: airflow-dev-airflow-config
    name: airflow-config [](url)