kubernetes worker pod 上的气流已完成但 Web-Ui 无法获取状态

Airflow on kubernetes worker pod completed but Web-Ui can't get the status

当我在 kubernetes infra 上设置气流时,我遇到了一些问题。 我参考了 this 博客。并且根据我的情况更改了一些设置。 我认为一切正常,但我 运行 手动或计划进行。 worker pod 工作得很好(我认为)但是 web-ui 总是没有改变状态只是 运行ning 和排队...... 我想知道哪里出了问题...

这是我的设置值。

版本信息

AWS EKS ( kubernetes version: 1.21 )
Airflow ( 2.2.3 )
Python 3.8

container.yaml

#  Licensed to the Apache Software Foundation (ASF) under one   *
#  or more contributor license agreements.  See the NOTICE file *
#  distributed with this work for additional information        *
#  regarding copyright ownership.  The ASF licenses this file   *
#  to you under the Apache License, Version 2.0 (the            *
#  "License"); you may not use this file except in compliance   *
#  with the License.  You may obtain a copy of the License at   *
#                                                               *
#    http://www.apache.org/licenses/LICENSE-2.0                 *
#                                                               *
#  Unless required by applicable law or agreed to in writing,   *
#  software distributed under the License is distributed on an  *
#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
#  KIND, either express or implied.  See the License for the    *
#  specific language governing permissions and limitations      *
#  under the License.                                           *

# Note: The airflow image used in this example is obtained by   *
# building the image from the local docker subdirectory.        *
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: airflow
  namespace: airflow
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow
rules:
  - apiGroups: [""] # "" indicates the core API group
    resources: ["pods"]
    verbs: ["get", "list", "watch", "create", "update", "delete"]
  - apiGroups: [ "" ]
    resources: [ "pods/log" ]
    verbs: [ "get", "list" ]
  - apiGroups: [ "" ]
    resources: [ "pods/exec" ]
    verbs: [ "create", "get" ]
  - apiGroups: ["batch", "extensions"]
    resources: ["jobs"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow
  namespace: airflow
subjects:
  - kind: ServiceAccount
    name: airflow # Name of the ServiceAccount
    namespace: airflow
roleRef:
  kind: Role # This must be Role or ClusterRole
  name: airflow # This must match the name of the Role
                #   or ClusterRole you wish to bind to
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      name: airflow
  template:
    metadata:
      labels:
        name: airflow
    spec:
      serviceAccountName: airflow
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: lifecycle
                operator: NotIn
                values:
                - Ec2Spot
      initContainers:
      - name: "init"
        image: {{AIRFLOW_IMAGE}}:{{AIRFLOW_TAG}}
        imagePullPolicy: Always
        volumeMounts:
        - name: airflow-configmap
          mountPath: /opt/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: {{POD_AIRFLOW_VOLUME_NAME}}
          mountPath: /opt/airflow/dags
        env:
        - name: SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: sql_alchemy_conn
        command: ["/bin/bash", "-c"]
        args:
          - /tmp/airflow-test-env-init.sh {{INIT_GIT_SYNC}};
      containers:
      - name: webserver
        image: {{AIRFLOW_IMAGE}}:{{AIRFLOW_TAG}}
        imagePullPolicy: Always
        ports:
        - name: webserver
          containerPort: 8080
        args: ["webserver"]
        env:
        - name: AIRFLOW_KUBE_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: sql_alchemy_conn
        volumeMounts:
        - name: airflow-configmap
          mountPath: /opt/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: {{POD_AIRFLOW_VOLUME_NAME}}
          mountPath: /opt/airflow/dags
        - name: {{POD_AIRFLOW_VOLUME_NAME}}
          mountPath: /opt/airflow/logs
      - name: scheduler
        image: {{AIRFLOW_IMAGE}}:{{AIRFLOW_TAG}}
        imagePullPolicy: Always
        args: ["scheduler"]
        env:
        - name: AIRFLOW_KUBE_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: sql_alchemy_conn
        volumeMounts:
        - name: airflow-configmap
          mountPath: /opt/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: {{POD_AIRFLOW_VOLUME_NAME}}
          mountPath: /opt/airflow/dags
        - name: {{POD_AIRFLOW_VOLUME_NAME}}
          mountPath: /opt/airflow/logs
      - name: git-sync
        image: k8s.gcr.io/git-sync/git-sync:v3.4.0
        imagePullPolicy: IfNotPresent
        envFrom:
          - configMapRef:
              name: airflow-gitsync
          - secretRef:
              name: airflow-secrets
        volumeMounts:
          - name: {{POD_AIRFLOW_VOLUME_NAME}}
            mountPath: /git
      volumes:
      - name: {{INIT_DAGS_VOLUME_NAME}}
        emptyDir: {}
      - name: {{POD_AIRFLOW_VOLUME_NAME}}
        persistentVolumeClaim:
          claimName: airflow-efs-pvc
      - name: airflow-dags-fake
        emptyDir: {}
      - name: airflow-configmap
        configMap:
          name: airflow-configmap
      securityContext:
        runAsUser: 50000
        fsGroup: 0
---
apiVersion: v1
kind: Service
metadata:
  name: airflow
  namespace: airflow
  annotations:
    service.beta.kubernetes.io/aws-load-balancer-type: "nlb"
    service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
    service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "443"
    service.beta.kubernetes.io/aws-load-balancer-ssl-cert: {{AOK_SSL_ENDPOINT}}
spec:
  type: LoadBalancer
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8080
      nodePort: 30031
      name: http
    - protocol: TCP
      port: 443
      targetPort: 8080
      nodePort: 30032
      name: https
  selector:
    name: airflow

airflow.cfg

#  Licensed to the Apache Software Foundation (ASF) under one   *

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-gitsync
  namespace: airflow
data:
  GIT_SYNC_REPO: GITREPO      GIT_SYNC_BRANCH: xxx
  GIT_SYNC_ROOT: xxx
  GIT_SYNC_DEST: xxx
  GIT_SYNC_DEPTH: xxx
  GIT_SYNC_ONE_TIME: "false"
  GIT_SYNC_WAIT: "60"
  GIT_SYNC_USERNAME: xxx
  GIT_SYNC_PERMISSIONS: xxx
  GIT_KNOWN_HOSTS: "false"
  GIT_SYNC_PASSWORD: xxx
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-configmap
  namespace: airflow
data:
  airflow.cfg: |
    [core]
    dags_folder = {{CONFIGMAP_DAGS_FOLDER}}
    executor = KubernetesExecutor
    parallelism = 32
    load_examples = True
    plugins_folder = /opt/airflow/plugins
    sql_alchemy_conn = $SQL_ALCHEMY_CONN
    hide_sensitive_variable_fields = True

    [logging]
    logging_level = DEBUG
    base_log_folder = /opt/airflow/logs
    remote_logging = False 
    remote_log_conn_id = my_s3_conn
    remote_base_log_folder = s3://airflow/logs

    [scheduler]
    dag_dir_list_interval = 300
    child_process_log_directory = /opt/airflow/logs/scheduler
    # Task instances listen for external kill signal (when you clear tasks
    # from the CLI or the UI), this defines the frequency at which they should
    # listen (in seconds).
    job_heartbeat_sec = 5
    max_threads = 2

    # The scheduler constantly tries to trigger new tasks (look at the
    # scheduler section in the docs for more information). This defines
    # how often the scheduler should run (in seconds).
    scheduler_heartbeat_sec = 5

    # after how much time a new DAGs should be picked up from the filesystem
    min_file_process_interval = 0

    statsd_host = localhost
    statsd_port = 8125
    statsd_prefix = airflow

    # How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
    min_file_parsing_loop_time = 1

    print_stats_interval = 30
    scheduler_zombie_task_threshold = 300
    max_tis_per_query = 0
    authenticate = False

    catchup_by_default = True

    [webserver]
    base_url = http://0.0.0.0:8080
    rbac=True

    # The ip specified when starting the web server
    web_server_host = 0.0.0.0

    # The port on which to run the web server
    web_server_port = 8080

    # Paths to the SSL certificate and key for the web server. When both are
    # provided SSL will be enabled. This does not change the web server port.
    web_server_ssl_cert =
    web_server_ssl_key =

    # Number of seconds the webserver waits before killing gunicorn master that doesn't respond
    web_server_master_timeout = 120

    # Number of seconds the gunicorn webserver waits before timing out on a worker
    web_server_worker_timeout = 120

    # Number of workers to refresh at a time. When set to 0, worker refresh is
    # disabled. When nonzero, airflow periodically refreshes webserver workers by
    # bringing up new ones and killing old ones.
    worker_refresh_batch_size = 1

    # Number of seconds to wait before refreshing a batch of workers.
    worker_refresh_interval = 30

    # Secret key used to run your flask app
    secret_key = xxxxxxxxxxx

    # Number of workers to run the Gunicorn web server
    workers = 4

    # The worker class gunicorn should use. Choices include
    # sync (default), eventlet, gevent
    worker_class = sync

    # Log files for the gunicorn webserver. '-' means log to stderr.
    access_logfile = -
    error_logfile = -

    # Expose the configuration file in the web server
    expose_config = False

    # Default DAG view.  Valid values are:
    # tree, graph, duration, gantt, landing_times
    dag_default_view = tree

    # Default DAG orientation. Valid values are:
    # LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
    dag_orientation = LR

    # Puts the webserver in demonstration mode; blurs the names of Operators for
    # privacy.
    demo_mode = False

    # The amount of time (in secs) webserver will wait for initial handshake
    # while fetching logs from other worker machine
    log_fetch_timeout_sec = 5

    # By default, the webserver shows paused DAGs. Flip this to hide paused
    # DAGs by default
    hide_paused_dags_by_default = False

    # Consistent page size across all listing views in the UI
    page_size = 100

    [metrics]
    statsd_on = False

    [operators]
    # Default queue that tasks get assigned to and that worker listen on.
    default_queue = default

    [smtp]
    # If you want airflow to send emails on retries, failure, and you want to use
    # the airflow.utils.email.send_email_smtp function, you have to configure an
    # smtp server here
    smtp_host = localhost
    smtp_starttls = True
    smtp_ssl = False
    # Uncomment and set the user/pass settings if you want to use SMTP AUTH
    # smtp_user = airflow
    # smtp_password = airflow
    smtp_port = 25
    smtp_mail_from = airflow@example.com

    [kubernetes]
    run_as_user = 50000
    airflow_configmap = airflow-configmap
    worker_container_repository = {{AIRFLOW_IMAGE}}
    worker_container_tag = {{AIRFLOW_TAG}}
    worker_container_image_pull_policy = IfNotPresent
    worker_service_account_name = airflow
    worker_dags_folder=/opt/airflow/dags
    namespace = airflow
    pod_template_file = /opt/airflow/yamls/pod_template.yaml
    delete_worker_pods = False
    delete_worker_pods_on_failure = False
    dags_in_image = False
    git_repo = xxxx
    git_branch = xxx
    git_subpath = airflow/contrib/example_dags/
    git_sync_depth = xxx
    git_user = xxx
    git_password = {{CONFIGMAP_GIT_PASSWORD}}
    git_sync_root = /git
    git_sync_dest = projects
    git_dags_folder_mount_point = {{CONFIGMAP_GIT_DAGS_FOLDER_MOUNT_POINT}}
    dags_volume_claim = {{CONFIGMAP_DAGS_VOLUME_CLAIM}}
    dags_volume_subpath =
    logs_volume_claim = {{CONFIGMAP_DAGS_VOLUME_CLAIM}} 
    logs_volume_subpath =
    dags_volume_host =
    logs_volume_host =
    in_cluster = True
    gcp_service_account_keys =
    enable_tcp_keepalive = True

    # Example affinity and toleration definitions.
    affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"weight":1,"preference":[{"matchExpressions":[{"key":"lifecycle","operator":"In","value":"Ec2Spot"}]}],"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
    tolerations = [{ "key": "spotInstance", "operator": "Equal", "value": "true", "effect": "PreferNoSchedule" },{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]
    # affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
    # tolerations = [{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]

    # For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
    git_sync_container_repository = k8s.gcr.io/git-sync
    git_sync_container_tag = v3.3.0
    git_sync_init_container_name = git-sync-clone

    [kubernetes_node_selectors]
    # The Key-value pairs to be given to worker pods.
    # The worker pods will be scheduled to the nodes of the specified key-value pairs.
    # Should be supplied in the format: key = value

    [kubernetes_annotations]
    # The Key-value annotations pairs to be given to worker pods.
    # Should be supplied in the format: key = value

    [kubernetes_secrets]
    SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn

    [hive]
    # Default mapreduce queue for HiveOperator tasks
    default_hive_mapred_queue =

    [celery]
    # This section only applies if you are using the CeleryExecutor in
    # [core] section above

    # The app name that will be used by celery
    celery_app_name = airflow.executors.celery_executor

    worker_concurrency = 16

    worker_log_server_port = 8793

    broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
result-backend-settings
    result_backend = db+mysql://airflow:airflow@localhost:3306/airflow

    flower_host = 0.0.0.0

    # The root URL for Flower
    # Ex: flower_url_prefix = /flower
    flower_url_prefix =

    # This defines the port that Celery Flower runs on
    flower_port = 5555

    # Securing Flower with Basic Authentication
    # Accepts user:password pairs separated by a comma
    # Example: flower_basic_auth = user1:password1,user2:password2
    flower_basic_auth =


    # How many processes CeleryExecutor uses to sync task state.
    # 0 means to use max(1, number of cores - 1) processes.
    sync_parallelism = 0

    # Import path for celery configuration options
    celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG

    visibility_timeout = 21600



    [ldap]
    # set this to ldaps://<your.ldap.server>:<port>
    uri =
    user_filter = objectClass=*
    user_name_attr = uid
    group_member_attr = memberOf
    superuser_filter =
    data_profiler_filter =
    bind_user = cn=Manager,dc=example,dc=com
    bind_password = insecure
    basedn = dc=example,dc=com
    cacert = /etc/ca/ldap_ca.crt
    search_scope = LEVEL

    [kerberos]
    ccache = /tmp/airflow_krb5_ccache
    # gets augmented with fqdn
    principal = airflow
    reinit_frequency = 3600
    kinit_path = kinit
    keytab = airflow.keytab

    [cli]
    api_client = airflow.api.client.json_client
    endpoint_url = http://0.0.0.0:8080

    [api]
    auth_backend = airflow.api.auth.backend.default

    [github_enterprise]
    api_rev = v3

    [admin]
    # UI to hide sensitive variable fields when set to True

    [elasticsearch]
    elasticsearch_host =

    [tests]
    unit_test_mode = False

volume.yaml

apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
  name: efs.csi.aws.com
spec:
  attachRequired: false
---
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: efs-sc
provisioner: efs.csi.aws.com
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-efs-pv
spec:
  capacity:
    storage: 100Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: efs-sc
  csi:
    driver: efs.csi.aws.com
    volumeHandle: {{AOK_EFS_FS_ID}}::{{AOK_EFS_AP}}
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-efs-pvc
  namespace: airflow
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: efs-sc
  resources:
    requests:
      storage: 3Gi

pod_template.yaml

---
apiVersion: v1
kind: Pod
metadata:
  name: airflow
  namespace: airflow
spec:
  serviceAccountName: airflow # this account have rights to create pods
  automountServiceAccountToken: true
  initContainers:
  - name: git-sync-clone
    image: xxx
    imagePullPolicy: IfNotPresent
    envFrom:
      - configMapRef:
          name: airflow-gitsync
      - secretRef:
          name: airflow-secrets
    volumeMounts:
      - name: airflow-dags
        mountPath: /opt/airflow/dags
      - name: airflow-configmap
        mountPath: /opt/airflow/airflow.cfg
        subPath: airflow.cfg
    env:
    - name: SQL_ALCHEMY_CONN
      valueFrom:
        secretKeyRef:
          name: airflow-secrets
          key: sql_alchemy_conn
    - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
      value: false
  containers:
  - name: base
    image: xxx
    imagePullPolicy: IfNotPresent
    volumeMounts:
      - name: airflow-dags
        mountPath: /opt/airflow/logs
      - name: airflow-dags
        mountPath: /opt/airflow/dags
        readOnly: true
      - name: airflow-configmap
        mountPath: /opt/airflow/airflow.cfg
        readOnly: true
        subPath: airflow.cfg
    env:
      - name: AIRFLOW_HOME
        value: /opt/airflow
      - name: AIRFLOW__CORE__EXECUTOR
        value: LocalExecutor
      - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
        value: false
      - name: AIRFLOW__KUBERNETES__ENABLE_TCP_KEEPALIVE
        value: true
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 0
  volumes:
    - name: airflow-dags
      persistentVolumeClaim:
        claimName: airflow-efs-pvc
    - name: airflow-logs
      emptyDir: {}
    - name: airflow-configmap
      configMap:
        name: airflow-configmap

这里是获取pods状态

NAME                                                               READY   STATUS      RESTARTS   AGE
airflow-85fdc74b9d-88gvn                                           3/3     Running     0          9m40s
examplebashoperatoralsorunthis.3190630823b84cc4a6eba09544c303a2    0/1     Completed   0          26s
examplebashoperatorrunme0.3babf6b0543f4f3ca9b231c12ae54d7d         0/1     Completed   0          28s
examplebashoperatorrunme1.a16ac8f949414b93bd8f39d9e67cee3a         0/1     Completed   0          29s
examplebashoperatorrunme2.a79ebbb01cd149b688285a857ed01a17         0/1     Completed   0          29s
examplebashoperatorthiswillskip.41b60a9693b2463f97af35b72a32b082   0/1     Completed   0          27s

和我的网站 Ui 状态 my web ui

我想知道为什么这个同步不正确...而且我无法获取 worker pod 日志...

问题出在您使用的气流 Docker 图片上。

我看到的 ENTRYPOINT 是您编写的自定义 .sh 文件,它决定 运行 网络服务器还是调度程序。

Airflow scheduler 为带有 args 的任务提交一个 pod,如下所示

containers:
  - args:
    - airflow
    - tasks
    - run
    - hello-world-dag
    - print_date_dag
    - scheduled__2022-03-13T00:00:00+00:00
    - --local
    - --subdir
    - DAGS_FOLDER/sample_dags/hello_world.py

您正在使用的气流图像不知道如何处理此命令,因为 .sh 文件没有处理此命令的案例,因为第一个参数不是 schedulerwebserver.

这使得调度程序提交 pod 并且它会退出而不做任何事情,因为 pod 没有 运行 任何东西。因此,就 k8s 而言,它是成功的,气流不知道状态。因此它将气流元数据数据库上的任务 table 更新为该任务的 None 状态。

请使用以下入口点文件使其工作

#!/usr/bin/env bash

# launch the appropriate process

if [ "" = "webserver" ]
then
    exec airflow webserver
elif [ "" = "scheduler" ]
then
    exec airflow scheduler
else
    exec "$@"
fi