工作人员的 Airflow Kubernetes pod 无法启动任务

Airflow Kubernetes pod for worker can't start task

我在 GKE 上安装了 Airflow 2.0.2。

airflow.cfg中:

executor = KubernetesExecutor
pod_template_file = /opt/airflow/pod_template_file.yaml
worker_container_repository = artifactory-address/my-team-airflow
worker_container_tag = latest
namespace = K8S_NAMESPACE
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 2
multi_namespace_mode = False
in_cluster = True

pod_template_file.yaml中:

spec:
  serviceAccountName: airflow
  automountServiceAccountToken: true
  containers:
    - name: base
      imagePullPolicy: Always
      resources:
        requests:
          memory: 500Mi
          cpu: 500m
        limits:
          memory: 1000Mi
          cpu: 1000m

Dockerfile中:

ENTRYPOINT ["/opt/airflow/entrypoint.sh"]

entrypoint.sh中:

#!/usr/bin/env bash

case "" in
  webserver)
    exec airflow webserver
    ;;
  scheduler)
    exec airflow scheduler
    ;;
  *)
    # The command is something like bash, not an airflow subcommand. Just run it in the right environment.
    exec "$@"
    ;;
esac

我创建了 DAG:

from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.utils.dates import days_ago
from kubernetes.client import models as k8s

args = {
    'owner': 'Simon Osipov',
}

def create_pipeline(dag_):
    org_node = KubernetesPodOperator(
        namespace='my-namespace',
        image='python:3.8',
        cmds=['python', '-c'],
        arguments=['print("HELLO")'],
        labels={'foo': 'bar'},
        image_pull_policy='Always',
        name='sample-pod-task',
        task_id='sample_pod_tasks',
        image_pull_secrets=[k8s.V1LocalObjectReference('my-secret')], # created separatelly
        is_delete_operator_pod=False,
        get_logs=True,
        dag=dag_
)

    org_node


with DAG(
    dag_id='parser-prefix',
    default_args=args,
    start_date=days_ago(2),
    tags=['my-tags']
    ) as dag:
    create_pipeline(dag)

当我 运行 这个 DAG 时,它会创建一个带有工作图像 (my-team-airflow) 的工作 pod:

Image:         artifactory-address/my-team-airflow:latest
    Image ID:      artifactory-address/my-team-airflow@sha256:ac0d7572f50912414fbf7df9a31dd87c20bfe1f36d8be0d3116c3c01a0992370
    Port:          <none>
    Host Port:     <none>
    Args:
      airflow
      tasks
      run
      parser-prefix
      sample_pod_tasks
      2021-05-26T09:39:50.207073+00:00
      --local
      --pool
      default_pool
      --subdir
      /opt/airflow/dags/6764b9dddbf24495327cd6eedad926ce5204a4f0/my-dag.py

据我了解,它应该是 运行 sh script,看不到 webserverscheduler 和 运行 所有参数 airflow tasks run parser-prefix sample_pod_tasks 2021-05-26T09:39:50.207073+00:00 --local --pool default_pool --subdir /opt/airflow/dags/6764b9dddbf24495327cd6eedad926ce5204a4f0/my-dag.py

但我收到一个错误:

airflow command error: argument GROUP_OR_COMMAND: invalid choice: 'airflow' (choose from 'celery', 'cheat-sheet', 'config', 'connections', 'dags', 'db', 'info', 'kerberos', 'kubernetes', 'plugins', 'pools', 'providers', 'roles', 'rotate-fernet-key', 'scheduler', 'sync-perm', 'tasks', 'users', 'variables', 'version', 'webserver'), see help above.
usage: airflow [-h] GROUP_OR_COMMAND ...

他好像要执行airflow airflow,但我想不通,为什么..

问题出在 airflow.cfg 中图像的旧名称。请务必检查您的更改。