工作人员的 Airflow Kubernetes pod 无法启动任务
Airflow Kubernetes pod for worker can't start task
我在 GKE 上安装了 Airflow 2.0.2。
在airflow.cfg
中:
executor = KubernetesExecutor
pod_template_file = /opt/airflow/pod_template_file.yaml
worker_container_repository = artifactory-address/my-team-airflow
worker_container_tag = latest
namespace = K8S_NAMESPACE
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 2
multi_namespace_mode = False
in_cluster = True
在pod_template_file.yaml
中:
spec:
serviceAccountName: airflow
automountServiceAccountToken: true
containers:
- name: base
imagePullPolicy: Always
resources:
requests:
memory: 500Mi
cpu: 500m
limits:
memory: 1000Mi
cpu: 1000m
在Dockerfile
中:
ENTRYPOINT ["/opt/airflow/entrypoint.sh"]
在entrypoint.sh
中:
#!/usr/bin/env bash
case "" in
webserver)
exec airflow webserver
;;
scheduler)
exec airflow scheduler
;;
*)
# The command is something like bash, not an airflow subcommand. Just run it in the right environment.
exec "$@"
;;
esac
我创建了 DAG:
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.utils.dates import days_ago
from kubernetes.client import models as k8s
args = {
'owner': 'Simon Osipov',
}
def create_pipeline(dag_):
org_node = KubernetesPodOperator(
namespace='my-namespace',
image='python:3.8',
cmds=['python', '-c'],
arguments=['print("HELLO")'],
labels={'foo': 'bar'},
image_pull_policy='Always',
name='sample-pod-task',
task_id='sample_pod_tasks',
image_pull_secrets=[k8s.V1LocalObjectReference('my-secret')], # created separatelly
is_delete_operator_pod=False,
get_logs=True,
dag=dag_
)
org_node
with DAG(
dag_id='parser-prefix',
default_args=args,
start_date=days_ago(2),
tags=['my-tags']
) as dag:
create_pipeline(dag)
当我 运行 这个 DAG 时,它会创建一个带有工作图像 (my-team-airflow) 的工作 pod:
Image: artifactory-address/my-team-airflow:latest
Image ID: artifactory-address/my-team-airflow@sha256:ac0d7572f50912414fbf7df9a31dd87c20bfe1f36d8be0d3116c3c01a0992370
Port: <none>
Host Port: <none>
Args:
airflow
tasks
run
parser-prefix
sample_pod_tasks
2021-05-26T09:39:50.207073+00:00
--local
--pool
default_pool
--subdir
/opt/airflow/dags/6764b9dddbf24495327cd6eedad926ce5204a4f0/my-dag.py
据我了解,它应该是 运行 sh script
,看不到 webserver
或 scheduler
和 运行 所有参数 airflow tasks run parser-prefix sample_pod_tasks 2021-05-26T09:39:50.207073+00:00 --local --pool default_pool --subdir /opt/airflow/dags/6764b9dddbf24495327cd6eedad926ce5204a4f0/my-dag.py
但我收到一个错误:
airflow command error: argument GROUP_OR_COMMAND: invalid choice: 'airflow' (choose from 'celery', 'cheat-sheet', 'config', 'connections', 'dags', 'db', 'info', 'kerberos', 'kubernetes', 'plugins', 'pools', 'providers', 'roles', 'rotate-fernet-key', 'scheduler', 'sync-perm', 'tasks', 'users', 'variables', 'version', 'webserver'), see help above.
usage: airflow [-h] GROUP_OR_COMMAND ...
他好像要执行airflow airflow
,但我想不通,为什么..
问题出在 airflow.cfg 中图像的旧名称。请务必检查您的更改。
我在 GKE 上安装了 Airflow 2.0.2。
在airflow.cfg
中:
executor = KubernetesExecutor
pod_template_file = /opt/airflow/pod_template_file.yaml
worker_container_repository = artifactory-address/my-team-airflow
worker_container_tag = latest
namespace = K8S_NAMESPACE
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 2
multi_namespace_mode = False
in_cluster = True
在pod_template_file.yaml
中:
spec:
serviceAccountName: airflow
automountServiceAccountToken: true
containers:
- name: base
imagePullPolicy: Always
resources:
requests:
memory: 500Mi
cpu: 500m
limits:
memory: 1000Mi
cpu: 1000m
在Dockerfile
中:
ENTRYPOINT ["/opt/airflow/entrypoint.sh"]
在entrypoint.sh
中:
#!/usr/bin/env bash
case "" in
webserver)
exec airflow webserver
;;
scheduler)
exec airflow scheduler
;;
*)
# The command is something like bash, not an airflow subcommand. Just run it in the right environment.
exec "$@"
;;
esac
我创建了 DAG:
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.utils.dates import days_ago
from kubernetes.client import models as k8s
args = {
'owner': 'Simon Osipov',
}
def create_pipeline(dag_):
org_node = KubernetesPodOperator(
namespace='my-namespace',
image='python:3.8',
cmds=['python', '-c'],
arguments=['print("HELLO")'],
labels={'foo': 'bar'},
image_pull_policy='Always',
name='sample-pod-task',
task_id='sample_pod_tasks',
image_pull_secrets=[k8s.V1LocalObjectReference('my-secret')], # created separatelly
is_delete_operator_pod=False,
get_logs=True,
dag=dag_
)
org_node
with DAG(
dag_id='parser-prefix',
default_args=args,
start_date=days_ago(2),
tags=['my-tags']
) as dag:
create_pipeline(dag)
当我 运行 这个 DAG 时,它会创建一个带有工作图像 (my-team-airflow) 的工作 pod:
Image: artifactory-address/my-team-airflow:latest
Image ID: artifactory-address/my-team-airflow@sha256:ac0d7572f50912414fbf7df9a31dd87c20bfe1f36d8be0d3116c3c01a0992370
Port: <none>
Host Port: <none>
Args:
airflow
tasks
run
parser-prefix
sample_pod_tasks
2021-05-26T09:39:50.207073+00:00
--local
--pool
default_pool
--subdir
/opt/airflow/dags/6764b9dddbf24495327cd6eedad926ce5204a4f0/my-dag.py
据我了解,它应该是 运行 sh script
,看不到 webserver
或 scheduler
和 运行 所有参数 airflow tasks run parser-prefix sample_pod_tasks 2021-05-26T09:39:50.207073+00:00 --local --pool default_pool --subdir /opt/airflow/dags/6764b9dddbf24495327cd6eedad926ce5204a4f0/my-dag.py
但我收到一个错误:
airflow command error: argument GROUP_OR_COMMAND: invalid choice: 'airflow' (choose from 'celery', 'cheat-sheet', 'config', 'connections', 'dags', 'db', 'info', 'kerberos', 'kubernetes', 'plugins', 'pools', 'providers', 'roles', 'rotate-fernet-key', 'scheduler', 'sync-perm', 'tasks', 'users', 'variables', 'version', 'webserver'), see help above.
usage: airflow [-h] GROUP_OR_COMMAND ...
他好像要执行airflow airflow
,但我想不通,为什么..
问题出在 airflow.cfg 中图像的旧名称。请务必检查您的更改。