无法执行 Airflow KubernetesExecutor

Unable to execute Airflow KubernetesExecutor

here 的项目之后,我正在尝试使用 NFS 服务器作为后备存储 PV 来集成 airflow kubernetes 执行器。我有一个与 NFS 服务器链接的 PV airflow-pv。 Airflow 网络服务器和调度程序正在使用与 airflow-pv 绑定的 PVC airflow-pvc。我已将我的 dag 文件放在 NFS 服务器 /var/nfs/airflow/development/<dags/logs> 中。我也可以在网络服务器 UI 中看到新添加的 DAGS。但是,当我从 UI 执行 DAG 时,调度程序会为该任务触发一个新的 POD,但新的 worker pod 无法 运行 说

Unable to mount volumes for pod "tutorialprintdate-3e1a4443363e4c9f81fd63438cdb9873_development(976b1e64-b46d-11e9-92af-025000000001)": timeout expired waiting for volumes to attach or mount for pod "development"/"tutorialprintdate-3e1a4443363e4c9f81fd63438cdb9873". list of unmounted volumes=[airflow-dags]. list of unattached volumes=[airflow-dags airflow-logs airflow-config default-token-hjwth]

这是我的网络服务器和调度程序部署文件;

apiVersion: v1
kind: Service
metadata:
  name: airflow-webserver-svc
  namespace: development
spec:
  type: NodePort
  ports:
    - name: web
      protocol: TCP
      port: 8080
  selector:
    app: airflow-webserver-app
    namespace: development

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow-webserver-dep
  namespace: development
spec:
  replicas: 1
  selector:
    matchLabels:
      app: airflow-webserver-app
      namespace: development
  template:
    metadata:
      labels:
        app: airflow-webserver-app
        namespace: development
    spec:
      restartPolicy: Always
      containers:
      - name: airflow-webserver-app
        image: airflow:externalConfigs
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 8080
        args: ["-webserver"]
        env:
        - name: AIRFLOW_KUBE_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: AIRFLOW__CORE__FERNET_KEY
        - name: MYSQL_ROOT_PASSWORD
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: MYSQL_PASSWORD
        - name: MYSQL_PASSWORD
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: MYSQL_PASSWORD
        - name: DB_HOST
          value: mysql-svc.development.svc.cluster.local
        - name: DB_PORT
          value: "3306"
        - name: MYSQL_DATABASE
          value: airflow
        - name: MYSQL_USER
          value: airflow
        - name: MYSQL_PASSWORD
          value: airflow
        - name: AIRFLOW__CORE__EXECUTOR
          value: "KubernetesExecutor"
        volumeMounts:
        - name: airflow-config
          mountPath: /usr/local/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: airflow-files
          mountPath: /usr/local/airflow/dags
          subPath: airflow/development/dags
        - name: airflow-files
          mountPath: /usr/local/airflow/plugins
          subPath: airflow/development/plugins
        - name: airflow-files
          mountPath: /usr/local/airflow/logs
          subPath: airflow/development/logs
        - name: airflow-files
          mountPath: /usr/local/airflow/temp
          subPath: airflow/development/temp
      volumes:
        - name: airflow-files
          persistentVolumeClaim:
            claimName: airflow-pvc
        - name: airflow-config
          configMap:
            name: airflow-config

调度程序 yaml 文件完全相同,只是容器参数是 args: ["-scheduler"]。这是我的 airflow.cfg 文件,

apiVersion: v1
kind: ConfigMap
metadata:
  name: "airflow-config"
  namespace: development
data:
  airflow.cfg: |
    [core]
    airflow_home = /usr/local/airflow
    dags_folder = /usr/local/airflow/dags
    base_log_folder = /usr/local/airflow/logs
    executor = KubernetesExecutor
    plugins_folder = /usr/local/airflow/plugins

    load_examples = false

    [scheduler]
    child_process_log_directory = /usr/local/airflow/logs/scheduler

    [webserver]
    rbac = false

    [kubernetes]
    airflow_configmap =
    worker_container_repository = airflow
    worker_container_tag = externalConfigs
    worker_container_image_pull_policy = IfNotPresent
    delete_worker_pods = true
    dags_volume_claim = airflow-pvc
    dags_volume_subpath =
    logs_volume_claim = airflow-pvc
    logs_volume_subpath =

    env_from_configmap_ref = airflow-config
    env_from_secret_ref = airflow-secrets

    in_cluster = true
    namespace = development

    [kubernetes_node_selectors]
    # the key-value pairs to be given to worker pods.
    # the worker pods will be scheduled to the nodes of the specified key-value pairs.
    # should be supplied in the format: key = value

    [kubernetes_environment_variables]
    //the below configs gets overwritten by above [kubernetes] configs
    AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM = airflow-pvc
    AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH = var/nfs/airflow/development/dags
    AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM = airflow-pvc
    AIRFLOW__KUBERNETES__LOGS_VOLUME_SUBPATH = var/nfs/airflow/development/logs

    [kubernetes_secrets]
    AIRFLOW__CORE__SQL_ALCHEMY_CONN = airflow-secrets=AIRFLOW__CORE__SQL_ALCHEMY_CONN
    AIRFLOW_HOME = airflow-secrets=AIRFLOW_HOME

    [cli]
    api_client = airflow.api.client.json_client
    endpoint_url = https://airflow.crunchanalytics.cloud

    [api]
    auth_backend = airflow.api.auth.backend.default

    [admin]
    # ui to hide sensitive variable fields when set to true
    hide_sensitive_variable_fields = true

触发手动任务后,调度程序的日志告诉我 KubernetesExecutorConfig() 执行时所有值都为 None。好像它没有获取配置?我已经尝试了几乎我所知道的一切,但无法让它发挥作用。有人能告诉我我想念什么吗?

[2019-08-01 14:44:22,944] {jobs.py:1341} INFO - Sending ('kubernetes_sample', 'run_this_first', datetime.datetime(2019, 8, 1, 13, 45, 51, 874679, tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
[2019-08-01 14:44:22,944] {base_executor.py:56} INFO - Adding to queue: airflow run kubernetes_sample run_this_first 2019-08-01T13:45:51.874679+00:00 --local -sd /usr/local/airflow/dags/airflow/development/dags/k8s_dag.py
[2019-08-01 14:44:22,948] {kubernetes_executor.py:629} INFO - Add task ('kubernetes_sample', 'run_this_first', datetime.datetime(2019, 8, 1, 13, 45, 51, 874679, tzinfo=<Timezone [UTC]>), 1) with command airflow run kubernetes_sample run_this_first 2019-08-01T13:45:51.874679+00:00 --local -sd /usr/local/airflow/dags/airflow/development/dags/k8s_dag.py with executor_config {}
[2019-08-01 14:44:22,949] {kubernetes_executor.py:379} INFO - Kubernetes job is (('kubernetes_sample', 'run_this_first', datetime.datetime(2019, 8, 1, 13, 45, 51, 874679, tzinfo=<Timezone [UTC]>), 1), 'airflow run kubernetes_sample run_this_first 2019-08-01T13:45:51.874679+00:00 --local -sd /usr/local/airflow/dags/airflow/development/dags/k8s_dag.py', KubernetesExecutorConfig(image=None, image_pull_policy=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None, gcp_service_account_key=None, node_selectors=None, affinity=None, annotations={}, volumes=[], volume_mounts=[], tolerations=None))
[2019-08-01 14:44:23,042] {kubernetes_executor.py:292} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 had an event of type ADDED
[2019-08-01 14:44:23,046] {kubernetes_executor.py:324} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 Pending
[2019-08-01 14:44:23,049] {kubernetes_executor.py:292} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 had an event of type MODIFIED
[2019-08-01 14:44:23,049] {kubernetes_executor.py:324} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 Pending

供参考,这是我的PV和PVC;

kind: PersistentVolume
apiVersion: v1
metadata:
  name: airflow-pv
  labels:
    mode: local
    environment: development
spec:
  persistentVolumeReclaimPolicy: Retain
  storageClassName: airflow-pv
  capacity:
    storage: 4Gi
  accessModes:
    - ReadWriteMany
  nfs:
    server: 10.105.225.217
    path: "/"
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: airflow-pvc
  namespace: development
spec:
  storageClassName: airflow-pv
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 1Gi
  selector:
    matchLabels:
      mode: local
      environment: development

使用 Airflow 版本:1.10.3

由于还没有答案,我将分享我目前的发现。在 kubernetes 部分下的 airflow.conf 中,我们将传递以下值

dags_volume_claim = airflow-pvc
dags_volume_subpath = airflow/development/dags
logs_volume_claim = airflow-pvc
logs_volume_subpath = airflow/development/logs

调度程序如何从上述配置创建新 pod 的方式如下(仅提及卷和 volumeMounts);

"volumes": [
  {
    "name": "airflow-dags",
    "persistentVolumeClaim": {
      "claimName": "airflow-pvc"
    }
  },
  {
    "name": "airflow-logs",
    "persistentVolumeClaim": {
      "claimName": "airflow-pvc"
    }
  }],
"containers": [
    { ...
  "volumeMounts": [
      {
        "name": "airflow-dags",
        "readOnly": true,
        "mountPath": "/usr/local/airflow/dags",
        "subPath": "airflow/development/dags"
      },
      {
        "name": "airflow-logs",
        "mountPath": "/usr/local/airflow/logs",
        "subPath": "airflow/development/logs"
      }]
...}]

K8s 不喜欢指向同一个 pvc (airflow-pvc) 的多个卷。要解决此问题,我会为 dags 和日志 dags_volume_claim = airflow-dags-pvclogs_volume_claim = airflow-log-pvc 创建两个 PVC(和 PV),效果很好。

我不知道新版本的气流是否已经解决了这个问题(我使用的是 1.10.3)。当人们使用相同的 PVC 然后创建一个具有单个卷和 2 个引用该卷的 volumeMounts 的 pod 时,气流调度程序应该处理这种情况,例如

"volumes": [
  {
    "name": "airflow-dags-logs",    <--just an example name
    "persistentVolumeClaim": {
      "claimName": "airflow-pvc"
    }
  }
"containers": [
    { ...
  "volumeMounts": [
      {
        "name": "airflow-dags-logs",
        "readOnly": true,
        "mountPath": "/usr/local/airflow/dags",
        "subPath": "airflow/development/dags"     <--taken from configs
      },
      {
        "name": "airflow-dags-logs",
        "mountPath": "/usr/local/airflow/logs",
        "subPath": "airflow/development/logs"     <--taken from configs
      }]
...}]

我用上述配置部署了一个 pod,它工作正常!