kubernetes worker pod 上的气流已完成但 Web-Ui 无法获取状态
Airflow on kubernetes worker pod completed but Web-Ui can't get the status
当我在 kubernetes infra 上设置气流时,我遇到了一些问题。
我参考了 this 博客。并且根据我的情况更改了一些设置。
我认为一切正常,但我 运行 手动或计划进行。 worker pod 工作得很好(我认为)但是 web-ui 总是没有改变状态只是 运行ning 和排队......
我想知道哪里出了问题...
这是我的设置值。
版本信息
AWS EKS ( kubernetes version: 1.21 )
Airflow ( 2.2.3 )
Python 3.8
container.yaml
# Licensed to the Apache Software Foundation (ASF) under one *
# or more contributor license agreements. See the NOTICE file *
# distributed with this work for additional information *
# regarding copyright ownership. The ASF licenses this file *
# to you under the Apache License, Version 2.0 (the *
# "License"); you may not use this file except in compliance *
# with the License. You may obtain a copy of the License at *
# *
# http://www.apache.org/licenses/LICENSE-2.0 *
# *
# Unless required by applicable law or agreed to in writing, *
# software distributed under the License is distributed on an *
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
# KIND, either express or implied. See the License for the *
# specific language governing permissions and limitations *
# under the License. *
# Note: The airflow image used in this example is obtained by *
# building the image from the local docker subdirectory. *
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: airflow
namespace: airflow
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow
rules:
- apiGroups: [""] # "" indicates the core API group
resources: ["pods"]
verbs: ["get", "list", "watch", "create", "update", "delete"]
- apiGroups: [ "" ]
resources: [ "pods/log" ]
verbs: [ "get", "list" ]
- apiGroups: [ "" ]
resources: [ "pods/exec" ]
verbs: [ "create", "get" ]
- apiGroups: ["batch", "extensions"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow # Name of the ServiceAccount
namespace: airflow
roleRef:
kind: Role # This must be Role or ClusterRole
name: airflow # This must match the name of the Role
# or ClusterRole you wish to bind to
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
name: airflow
template:
metadata:
labels:
name: airflow
spec:
serviceAccountName: airflow
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: lifecycle
operator: NotIn
values:
- Ec2Spot
initContainers:
- name: "init"
image: {{AIRFLOW_IMAGE}}:{{AIRFLOW_TAG}}
imagePullPolicy: Always
volumeMounts:
- name: airflow-configmap
mountPath: /opt/airflow/airflow.cfg
subPath: airflow.cfg
- name: {{POD_AIRFLOW_VOLUME_NAME}}
mountPath: /opt/airflow/dags
env:
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
command: ["/bin/bash", "-c"]
args:
- /tmp/airflow-test-env-init.sh {{INIT_GIT_SYNC}};
containers:
- name: webserver
image: {{AIRFLOW_IMAGE}}:{{AIRFLOW_TAG}}
imagePullPolicy: Always
ports:
- name: webserver
containerPort: 8080
args: ["webserver"]
env:
- name: AIRFLOW_KUBE_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
volumeMounts:
- name: airflow-configmap
mountPath: /opt/airflow/airflow.cfg
subPath: airflow.cfg
- name: {{POD_AIRFLOW_VOLUME_NAME}}
mountPath: /opt/airflow/dags
- name: {{POD_AIRFLOW_VOLUME_NAME}}
mountPath: /opt/airflow/logs
- name: scheduler
image: {{AIRFLOW_IMAGE}}:{{AIRFLOW_TAG}}
imagePullPolicy: Always
args: ["scheduler"]
env:
- name: AIRFLOW_KUBE_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
volumeMounts:
- name: airflow-configmap
mountPath: /opt/airflow/airflow.cfg
subPath: airflow.cfg
- name: {{POD_AIRFLOW_VOLUME_NAME}}
mountPath: /opt/airflow/dags
- name: {{POD_AIRFLOW_VOLUME_NAME}}
mountPath: /opt/airflow/logs
- name: git-sync
image: k8s.gcr.io/git-sync/git-sync:v3.4.0
imagePullPolicy: IfNotPresent
envFrom:
- configMapRef:
name: airflow-gitsync
- secretRef:
name: airflow-secrets
volumeMounts:
- name: {{POD_AIRFLOW_VOLUME_NAME}}
mountPath: /git
volumes:
- name: {{INIT_DAGS_VOLUME_NAME}}
emptyDir: {}
- name: {{POD_AIRFLOW_VOLUME_NAME}}
persistentVolumeClaim:
claimName: airflow-efs-pvc
- name: airflow-dags-fake
emptyDir: {}
- name: airflow-configmap
configMap:
name: airflow-configmap
securityContext:
runAsUser: 50000
fsGroup: 0
---
apiVersion: v1
kind: Service
metadata:
name: airflow
namespace: airflow
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: "nlb"
service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "443"
service.beta.kubernetes.io/aws-load-balancer-ssl-cert: {{AOK_SSL_ENDPOINT}}
spec:
type: LoadBalancer
ports:
- protocol: TCP
port: 80
targetPort: 8080
nodePort: 30031
name: http
- protocol: TCP
port: 443
targetPort: 8080
nodePort: 30032
name: https
selector:
name: airflow
airflow.cfg
# Licensed to the Apache Software Foundation (ASF) under one *
---
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-gitsync
namespace: airflow
data:
GIT_SYNC_REPO: GITREPO GIT_SYNC_BRANCH: xxx
GIT_SYNC_ROOT: xxx
GIT_SYNC_DEST: xxx
GIT_SYNC_DEPTH: xxx
GIT_SYNC_ONE_TIME: "false"
GIT_SYNC_WAIT: "60"
GIT_SYNC_USERNAME: xxx
GIT_SYNC_PERMISSIONS: xxx
GIT_KNOWN_HOSTS: "false"
GIT_SYNC_PASSWORD: xxx
---
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-configmap
namespace: airflow
data:
airflow.cfg: |
[core]
dags_folder = {{CONFIGMAP_DAGS_FOLDER}}
executor = KubernetesExecutor
parallelism = 32
load_examples = True
plugins_folder = /opt/airflow/plugins
sql_alchemy_conn = $SQL_ALCHEMY_CONN
hide_sensitive_variable_fields = True
[logging]
logging_level = DEBUG
base_log_folder = /opt/airflow/logs
remote_logging = False
remote_log_conn_id = my_s3_conn
remote_base_log_folder = s3://airflow/logs
[scheduler]
dag_dir_list_interval = 300
child_process_log_directory = /opt/airflow/logs/scheduler
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5
max_threads = 2
# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5
# after how much time a new DAGs should be picked up from the filesystem
min_file_process_interval = 0
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
# How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
min_file_parsing_loop_time = 1
print_stats_interval = 30
scheduler_zombie_task_threshold = 300
max_tis_per_query = 0
authenticate = False
catchup_by_default = True
[webserver]
base_url = http://0.0.0.0:8080
rbac=True
# The ip specified when starting the web server
web_server_host = 0.0.0.0
# The port on which to run the web server
web_server_port = 8080
# Paths to the SSL certificate and key for the web server. When both are
# provided SSL will be enabled. This does not change the web server port.
web_server_ssl_cert =
web_server_ssl_key =
# Number of seconds the webserver waits before killing gunicorn master that doesn't respond
web_server_master_timeout = 120
# Number of seconds the gunicorn webserver waits before timing out on a worker
web_server_worker_timeout = 120
# Number of workers to refresh at a time. When set to 0, worker refresh is
# disabled. When nonzero, airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1
# Number of seconds to wait before refreshing a batch of workers.
worker_refresh_interval = 30
# Secret key used to run your flask app
secret_key = xxxxxxxxxxx
# Number of workers to run the Gunicorn web server
workers = 4
# The worker class gunicorn should use. Choices include
# sync (default), eventlet, gevent
worker_class = sync
# Log files for the gunicorn webserver. '-' means log to stderr.
access_logfile = -
error_logfile = -
# Expose the configuration file in the web server
expose_config = False
# Default DAG view. Valid values are:
# tree, graph, duration, gantt, landing_times
dag_default_view = tree
# Default DAG orientation. Valid values are:
# LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
dag_orientation = LR
# Puts the webserver in demonstration mode; blurs the names of Operators for
# privacy.
demo_mode = False
# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
log_fetch_timeout_sec = 5
# By default, the webserver shows paused DAGs. Flip this to hide paused
# DAGs by default
hide_paused_dags_by_default = False
# Consistent page size across all listing views in the UI
page_size = 100
[metrics]
statsd_on = False
[operators]
# Default queue that tasks get assigned to and that worker listen on.
default_queue = default
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
# smtp_user = airflow
# smtp_password = airflow
smtp_port = 25
smtp_mail_from = airflow@example.com
[kubernetes]
run_as_user = 50000
airflow_configmap = airflow-configmap
worker_container_repository = {{AIRFLOW_IMAGE}}
worker_container_tag = {{AIRFLOW_TAG}}
worker_container_image_pull_policy = IfNotPresent
worker_service_account_name = airflow
worker_dags_folder=/opt/airflow/dags
namespace = airflow
pod_template_file = /opt/airflow/yamls/pod_template.yaml
delete_worker_pods = False
delete_worker_pods_on_failure = False
dags_in_image = False
git_repo = xxxx
git_branch = xxx
git_subpath = airflow/contrib/example_dags/
git_sync_depth = xxx
git_user = xxx
git_password = {{CONFIGMAP_GIT_PASSWORD}}
git_sync_root = /git
git_sync_dest = projects
git_dags_folder_mount_point = {{CONFIGMAP_GIT_DAGS_FOLDER_MOUNT_POINT}}
dags_volume_claim = {{CONFIGMAP_DAGS_VOLUME_CLAIM}}
dags_volume_subpath =
logs_volume_claim = {{CONFIGMAP_DAGS_VOLUME_CLAIM}}
logs_volume_subpath =
dags_volume_host =
logs_volume_host =
in_cluster = True
gcp_service_account_keys =
enable_tcp_keepalive = True
# Example affinity and toleration definitions.
affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"weight":1,"preference":[{"matchExpressions":[{"key":"lifecycle","operator":"In","value":"Ec2Spot"}]}],"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
tolerations = [{ "key": "spotInstance", "operator": "Equal", "value": "true", "effect": "PreferNoSchedule" },{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]
# affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
# tolerations = [{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]
# For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
git_sync_container_repository = k8s.gcr.io/git-sync
git_sync_container_tag = v3.3.0
git_sync_init_container_name = git-sync-clone
[kubernetes_node_selectors]
# The Key-value pairs to be given to worker pods.
# The worker pods will be scheduled to the nodes of the specified key-value pairs.
# Should be supplied in the format: key = value
[kubernetes_annotations]
# The Key-value annotations pairs to be given to worker pods.
# Should be supplied in the format: key = value
[kubernetes_secrets]
SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn
[hive]
# Default mapreduce queue for HiveOperator tasks
default_hive_mapred_queue =
[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above
# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor
worker_concurrency = 16
worker_log_server_port = 8793
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
result-backend-settings
result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
flower_host = 0.0.0.0
# The root URL for Flower
# Ex: flower_url_prefix = /flower
flower_url_prefix =
# This defines the port that Celery Flower runs on
flower_port = 5555
# Securing Flower with Basic Authentication
# Accepts user:password pairs separated by a comma
# Example: flower_basic_auth = user1:password1,user2:password2
flower_basic_auth =
# How many processes CeleryExecutor uses to sync task state.
# 0 means to use max(1, number of cores - 1) processes.
sync_parallelism = 0
# Import path for celery configuration options
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
visibility_timeout = 21600
[ldap]
# set this to ldaps://<your.ldap.server>:<port>
uri =
user_filter = objectClass=*
user_name_attr = uid
group_member_attr = memberOf
superuser_filter =
data_profiler_filter =
bind_user = cn=Manager,dc=example,dc=com
bind_password = insecure
basedn = dc=example,dc=com
cacert = /etc/ca/ldap_ca.crt
search_scope = LEVEL
[kerberos]
ccache = /tmp/airflow_krb5_ccache
# gets augmented with fqdn
principal = airflow
reinit_frequency = 3600
kinit_path = kinit
keytab = airflow.keytab
[cli]
api_client = airflow.api.client.json_client
endpoint_url = http://0.0.0.0:8080
[api]
auth_backend = airflow.api.auth.backend.default
[github_enterprise]
api_rev = v3
[admin]
# UI to hide sensitive variable fields when set to True
[elasticsearch]
elasticsearch_host =
[tests]
unit_test_mode = False
volume.yaml
apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
name: efs.csi.aws.com
spec:
attachRequired: false
---
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: efs-sc
provisioner: efs.csi.aws.com
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-efs-pv
spec:
capacity:
storage: 100Gi
volumeMode: Filesystem
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: efs-sc
csi:
driver: efs.csi.aws.com
volumeHandle: {{AOK_EFS_FS_ID}}::{{AOK_EFS_AP}}
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-efs-pvc
namespace: airflow
spec:
accessModes:
- ReadWriteMany
storageClassName: efs-sc
resources:
requests:
storage: 3Gi
pod_template.yaml
---
apiVersion: v1
kind: Pod
metadata:
name: airflow
namespace: airflow
spec:
serviceAccountName: airflow # this account have rights to create pods
automountServiceAccountToken: true
initContainers:
- name: git-sync-clone
image: xxx
imagePullPolicy: IfNotPresent
envFrom:
- configMapRef:
name: airflow-gitsync
- secretRef:
name: airflow-secrets
volumeMounts:
- name: airflow-dags
mountPath: /opt/airflow/dags
- name: airflow-configmap
mountPath: /opt/airflow/airflow.cfg
subPath: airflow.cfg
env:
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
- name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
value: false
containers:
- name: base
image: xxx
imagePullPolicy: IfNotPresent
volumeMounts:
- name: airflow-dags
mountPath: /opt/airflow/logs
- name: airflow-dags
mountPath: /opt/airflow/dags
readOnly: true
- name: airflow-configmap
mountPath: /opt/airflow/airflow.cfg
readOnly: true
subPath: airflow.cfg
env:
- name: AIRFLOW_HOME
value: /opt/airflow
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
- name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
value: false
- name: AIRFLOW__KUBERNETES__ENABLE_TCP_KEEPALIVE
value: true
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 0
volumes:
- name: airflow-dags
persistentVolumeClaim:
claimName: airflow-efs-pvc
- name: airflow-logs
emptyDir: {}
- name: airflow-configmap
configMap:
name: airflow-configmap
这里是获取pods状态
NAME READY STATUS RESTARTS AGE
airflow-85fdc74b9d-88gvn 3/3 Running 0 9m40s
examplebashoperatoralsorunthis.3190630823b84cc4a6eba09544c303a2 0/1 Completed 0 26s
examplebashoperatorrunme0.3babf6b0543f4f3ca9b231c12ae54d7d 0/1 Completed 0 28s
examplebashoperatorrunme1.a16ac8f949414b93bd8f39d9e67cee3a 0/1 Completed 0 29s
examplebashoperatorrunme2.a79ebbb01cd149b688285a857ed01a17 0/1 Completed 0 29s
examplebashoperatorthiswillskip.41b60a9693b2463f97af35b72a32b082 0/1 Completed 0 27s
和我的网站 Ui 状态
my web ui
我想知道为什么这个同步不正确...而且我无法获取 worker pod 日志...
问题出在您使用的气流 Docker 图片上。
我看到的 ENTRYPOINT
是您编写的自定义 .sh
文件,它决定 运行 网络服务器还是调度程序。
Airflow scheduler 为带有 args 的任务提交一个 pod,如下所示
containers:
- args:
- airflow
- tasks
- run
- hello-world-dag
- print_date_dag
- scheduled__2022-03-13T00:00:00+00:00
- --local
- --subdir
- DAGS_FOLDER/sample_dags/hello_world.py
您正在使用的气流图像不知道如何处理此命令,因为 .sh
文件没有处理此命令的案例,因为第一个参数不是 scheduler
或 webserver
.
这使得调度程序提交 pod 并且它会退出而不做任何事情,因为 pod 没有 运行 任何东西。因此,就 k8s 而言,它是成功的,气流不知道状态。因此它将气流元数据数据库上的任务 table 更新为该任务的 None 状态。
请使用以下入口点文件使其工作
#!/usr/bin/env bash
# launch the appropriate process
if [ "" = "webserver" ]
then
exec airflow webserver
elif [ "" = "scheduler" ]
then
exec airflow scheduler
else
exec "$@"
fi
当我在 kubernetes infra 上设置气流时,我遇到了一些问题。 我参考了 this 博客。并且根据我的情况更改了一些设置。 我认为一切正常,但我 运行 手动或计划进行。 worker pod 工作得很好(我认为)但是 web-ui 总是没有改变状态只是 运行ning 和排队...... 我想知道哪里出了问题...
这是我的设置值。
版本信息
AWS EKS ( kubernetes version: 1.21 )
Airflow ( 2.2.3 )
Python 3.8
container.yaml
# Licensed to the Apache Software Foundation (ASF) under one *
# or more contributor license agreements. See the NOTICE file *
# distributed with this work for additional information *
# regarding copyright ownership. The ASF licenses this file *
# to you under the Apache License, Version 2.0 (the *
# "License"); you may not use this file except in compliance *
# with the License. You may obtain a copy of the License at *
# *
# http://www.apache.org/licenses/LICENSE-2.0 *
# *
# Unless required by applicable law or agreed to in writing, *
# software distributed under the License is distributed on an *
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
# KIND, either express or implied. See the License for the *
# specific language governing permissions and limitations *
# under the License. *
# Note: The airflow image used in this example is obtained by *
# building the image from the local docker subdirectory. *
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: airflow
namespace: airflow
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow
rules:
- apiGroups: [""] # "" indicates the core API group
resources: ["pods"]
verbs: ["get", "list", "watch", "create", "update", "delete"]
- apiGroups: [ "" ]
resources: [ "pods/log" ]
verbs: [ "get", "list" ]
- apiGroups: [ "" ]
resources: [ "pods/exec" ]
verbs: [ "create", "get" ]
- apiGroups: ["batch", "extensions"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow # Name of the ServiceAccount
namespace: airflow
roleRef:
kind: Role # This must be Role or ClusterRole
name: airflow # This must match the name of the Role
# or ClusterRole you wish to bind to
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
name: airflow
template:
metadata:
labels:
name: airflow
spec:
serviceAccountName: airflow
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: lifecycle
operator: NotIn
values:
- Ec2Spot
initContainers:
- name: "init"
image: {{AIRFLOW_IMAGE}}:{{AIRFLOW_TAG}}
imagePullPolicy: Always
volumeMounts:
- name: airflow-configmap
mountPath: /opt/airflow/airflow.cfg
subPath: airflow.cfg
- name: {{POD_AIRFLOW_VOLUME_NAME}}
mountPath: /opt/airflow/dags
env:
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
command: ["/bin/bash", "-c"]
args:
- /tmp/airflow-test-env-init.sh {{INIT_GIT_SYNC}};
containers:
- name: webserver
image: {{AIRFLOW_IMAGE}}:{{AIRFLOW_TAG}}
imagePullPolicy: Always
ports:
- name: webserver
containerPort: 8080
args: ["webserver"]
env:
- name: AIRFLOW_KUBE_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
volumeMounts:
- name: airflow-configmap
mountPath: /opt/airflow/airflow.cfg
subPath: airflow.cfg
- name: {{POD_AIRFLOW_VOLUME_NAME}}
mountPath: /opt/airflow/dags
- name: {{POD_AIRFLOW_VOLUME_NAME}}
mountPath: /opt/airflow/logs
- name: scheduler
image: {{AIRFLOW_IMAGE}}:{{AIRFLOW_TAG}}
imagePullPolicy: Always
args: ["scheduler"]
env:
- name: AIRFLOW_KUBE_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
volumeMounts:
- name: airflow-configmap
mountPath: /opt/airflow/airflow.cfg
subPath: airflow.cfg
- name: {{POD_AIRFLOW_VOLUME_NAME}}
mountPath: /opt/airflow/dags
- name: {{POD_AIRFLOW_VOLUME_NAME}}
mountPath: /opt/airflow/logs
- name: git-sync
image: k8s.gcr.io/git-sync/git-sync:v3.4.0
imagePullPolicy: IfNotPresent
envFrom:
- configMapRef:
name: airflow-gitsync
- secretRef:
name: airflow-secrets
volumeMounts:
- name: {{POD_AIRFLOW_VOLUME_NAME}}
mountPath: /git
volumes:
- name: {{INIT_DAGS_VOLUME_NAME}}
emptyDir: {}
- name: {{POD_AIRFLOW_VOLUME_NAME}}
persistentVolumeClaim:
claimName: airflow-efs-pvc
- name: airflow-dags-fake
emptyDir: {}
- name: airflow-configmap
configMap:
name: airflow-configmap
securityContext:
runAsUser: 50000
fsGroup: 0
---
apiVersion: v1
kind: Service
metadata:
name: airflow
namespace: airflow
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: "nlb"
service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "443"
service.beta.kubernetes.io/aws-load-balancer-ssl-cert: {{AOK_SSL_ENDPOINT}}
spec:
type: LoadBalancer
ports:
- protocol: TCP
port: 80
targetPort: 8080
nodePort: 30031
name: http
- protocol: TCP
port: 443
targetPort: 8080
nodePort: 30032
name: https
selector:
name: airflow
airflow.cfg
# Licensed to the Apache Software Foundation (ASF) under one *
---
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-gitsync
namespace: airflow
data:
GIT_SYNC_REPO: GITREPO GIT_SYNC_BRANCH: xxx
GIT_SYNC_ROOT: xxx
GIT_SYNC_DEST: xxx
GIT_SYNC_DEPTH: xxx
GIT_SYNC_ONE_TIME: "false"
GIT_SYNC_WAIT: "60"
GIT_SYNC_USERNAME: xxx
GIT_SYNC_PERMISSIONS: xxx
GIT_KNOWN_HOSTS: "false"
GIT_SYNC_PASSWORD: xxx
---
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-configmap
namespace: airflow
data:
airflow.cfg: |
[core]
dags_folder = {{CONFIGMAP_DAGS_FOLDER}}
executor = KubernetesExecutor
parallelism = 32
load_examples = True
plugins_folder = /opt/airflow/plugins
sql_alchemy_conn = $SQL_ALCHEMY_CONN
hide_sensitive_variable_fields = True
[logging]
logging_level = DEBUG
base_log_folder = /opt/airflow/logs
remote_logging = False
remote_log_conn_id = my_s3_conn
remote_base_log_folder = s3://airflow/logs
[scheduler]
dag_dir_list_interval = 300
child_process_log_directory = /opt/airflow/logs/scheduler
# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5
max_threads = 2
# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
scheduler_heartbeat_sec = 5
# after how much time a new DAGs should be picked up from the filesystem
min_file_process_interval = 0
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
# How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
min_file_parsing_loop_time = 1
print_stats_interval = 30
scheduler_zombie_task_threshold = 300
max_tis_per_query = 0
authenticate = False
catchup_by_default = True
[webserver]
base_url = http://0.0.0.0:8080
rbac=True
# The ip specified when starting the web server
web_server_host = 0.0.0.0
# The port on which to run the web server
web_server_port = 8080
# Paths to the SSL certificate and key for the web server. When both are
# provided SSL will be enabled. This does not change the web server port.
web_server_ssl_cert =
web_server_ssl_key =
# Number of seconds the webserver waits before killing gunicorn master that doesn't respond
web_server_master_timeout = 120
# Number of seconds the gunicorn webserver waits before timing out on a worker
web_server_worker_timeout = 120
# Number of workers to refresh at a time. When set to 0, worker refresh is
# disabled. When nonzero, airflow periodically refreshes webserver workers by
# bringing up new ones and killing old ones.
worker_refresh_batch_size = 1
# Number of seconds to wait before refreshing a batch of workers.
worker_refresh_interval = 30
# Secret key used to run your flask app
secret_key = xxxxxxxxxxx
# Number of workers to run the Gunicorn web server
workers = 4
# The worker class gunicorn should use. Choices include
# sync (default), eventlet, gevent
worker_class = sync
# Log files for the gunicorn webserver. '-' means log to stderr.
access_logfile = -
error_logfile = -
# Expose the configuration file in the web server
expose_config = False
# Default DAG view. Valid values are:
# tree, graph, duration, gantt, landing_times
dag_default_view = tree
# Default DAG orientation. Valid values are:
# LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
dag_orientation = LR
# Puts the webserver in demonstration mode; blurs the names of Operators for
# privacy.
demo_mode = False
# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
log_fetch_timeout_sec = 5
# By default, the webserver shows paused DAGs. Flip this to hide paused
# DAGs by default
hide_paused_dags_by_default = False
# Consistent page size across all listing views in the UI
page_size = 100
[metrics]
statsd_on = False
[operators]
# Default queue that tasks get assigned to and that worker listen on.
default_queue = default
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
# smtp_user = airflow
# smtp_password = airflow
smtp_port = 25
smtp_mail_from = airflow@example.com
[kubernetes]
run_as_user = 50000
airflow_configmap = airflow-configmap
worker_container_repository = {{AIRFLOW_IMAGE}}
worker_container_tag = {{AIRFLOW_TAG}}
worker_container_image_pull_policy = IfNotPresent
worker_service_account_name = airflow
worker_dags_folder=/opt/airflow/dags
namespace = airflow
pod_template_file = /opt/airflow/yamls/pod_template.yaml
delete_worker_pods = False
delete_worker_pods_on_failure = False
dags_in_image = False
git_repo = xxxx
git_branch = xxx
git_subpath = airflow/contrib/example_dags/
git_sync_depth = xxx
git_user = xxx
git_password = {{CONFIGMAP_GIT_PASSWORD}}
git_sync_root = /git
git_sync_dest = projects
git_dags_folder_mount_point = {{CONFIGMAP_GIT_DAGS_FOLDER_MOUNT_POINT}}
dags_volume_claim = {{CONFIGMAP_DAGS_VOLUME_CLAIM}}
dags_volume_subpath =
logs_volume_claim = {{CONFIGMAP_DAGS_VOLUME_CLAIM}}
logs_volume_subpath =
dags_volume_host =
logs_volume_host =
in_cluster = True
gcp_service_account_keys =
enable_tcp_keepalive = True
# Example affinity and toleration definitions.
affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"weight":1,"preference":[{"matchExpressions":[{"key":"lifecycle","operator":"In","value":"Ec2Spot"}]}],"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
tolerations = [{ "key": "spotInstance", "operator": "Equal", "value": "true", "effect": "PreferNoSchedule" },{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]
# affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
# tolerations = [{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]
# For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
git_sync_container_repository = k8s.gcr.io/git-sync
git_sync_container_tag = v3.3.0
git_sync_init_container_name = git-sync-clone
[kubernetes_node_selectors]
# The Key-value pairs to be given to worker pods.
# The worker pods will be scheduled to the nodes of the specified key-value pairs.
# Should be supplied in the format: key = value
[kubernetes_annotations]
# The Key-value annotations pairs to be given to worker pods.
# Should be supplied in the format: key = value
[kubernetes_secrets]
SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn
[hive]
# Default mapreduce queue for HiveOperator tasks
default_hive_mapred_queue =
[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above
# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor
worker_concurrency = 16
worker_log_server_port = 8793
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
result-backend-settings
result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
flower_host = 0.0.0.0
# The root URL for Flower
# Ex: flower_url_prefix = /flower
flower_url_prefix =
# This defines the port that Celery Flower runs on
flower_port = 5555
# Securing Flower with Basic Authentication
# Accepts user:password pairs separated by a comma
# Example: flower_basic_auth = user1:password1,user2:password2
flower_basic_auth =
# How many processes CeleryExecutor uses to sync task state.
# 0 means to use max(1, number of cores - 1) processes.
sync_parallelism = 0
# Import path for celery configuration options
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
visibility_timeout = 21600
[ldap]
# set this to ldaps://<your.ldap.server>:<port>
uri =
user_filter = objectClass=*
user_name_attr = uid
group_member_attr = memberOf
superuser_filter =
data_profiler_filter =
bind_user = cn=Manager,dc=example,dc=com
bind_password = insecure
basedn = dc=example,dc=com
cacert = /etc/ca/ldap_ca.crt
search_scope = LEVEL
[kerberos]
ccache = /tmp/airflow_krb5_ccache
# gets augmented with fqdn
principal = airflow
reinit_frequency = 3600
kinit_path = kinit
keytab = airflow.keytab
[cli]
api_client = airflow.api.client.json_client
endpoint_url = http://0.0.0.0:8080
[api]
auth_backend = airflow.api.auth.backend.default
[github_enterprise]
api_rev = v3
[admin]
# UI to hide sensitive variable fields when set to True
[elasticsearch]
elasticsearch_host =
[tests]
unit_test_mode = False
volume.yaml
apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
name: efs.csi.aws.com
spec:
attachRequired: false
---
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: efs-sc
provisioner: efs.csi.aws.com
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-efs-pv
spec:
capacity:
storage: 100Gi
volumeMode: Filesystem
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: efs-sc
csi:
driver: efs.csi.aws.com
volumeHandle: {{AOK_EFS_FS_ID}}::{{AOK_EFS_AP}}
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-efs-pvc
namespace: airflow
spec:
accessModes:
- ReadWriteMany
storageClassName: efs-sc
resources:
requests:
storage: 3Gi
pod_template.yaml
---
apiVersion: v1
kind: Pod
metadata:
name: airflow
namespace: airflow
spec:
serviceAccountName: airflow # this account have rights to create pods
automountServiceAccountToken: true
initContainers:
- name: git-sync-clone
image: xxx
imagePullPolicy: IfNotPresent
envFrom:
- configMapRef:
name: airflow-gitsync
- secretRef:
name: airflow-secrets
volumeMounts:
- name: airflow-dags
mountPath: /opt/airflow/dags
- name: airflow-configmap
mountPath: /opt/airflow/airflow.cfg
subPath: airflow.cfg
env:
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
- name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
value: false
containers:
- name: base
image: xxx
imagePullPolicy: IfNotPresent
volumeMounts:
- name: airflow-dags
mountPath: /opt/airflow/logs
- name: airflow-dags
mountPath: /opt/airflow/dags
readOnly: true
- name: airflow-configmap
mountPath: /opt/airflow/airflow.cfg
readOnly: true
subPath: airflow.cfg
env:
- name: AIRFLOW_HOME
value: /opt/airflow
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
- name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
value: false
- name: AIRFLOW__KUBERNETES__ENABLE_TCP_KEEPALIVE
value: true
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 0
volumes:
- name: airflow-dags
persistentVolumeClaim:
claimName: airflow-efs-pvc
- name: airflow-logs
emptyDir: {}
- name: airflow-configmap
configMap:
name: airflow-configmap
这里是获取pods状态
NAME READY STATUS RESTARTS AGE
airflow-85fdc74b9d-88gvn 3/3 Running 0 9m40s
examplebashoperatoralsorunthis.3190630823b84cc4a6eba09544c303a2 0/1 Completed 0 26s
examplebashoperatorrunme0.3babf6b0543f4f3ca9b231c12ae54d7d 0/1 Completed 0 28s
examplebashoperatorrunme1.a16ac8f949414b93bd8f39d9e67cee3a 0/1 Completed 0 29s
examplebashoperatorrunme2.a79ebbb01cd149b688285a857ed01a17 0/1 Completed 0 29s
examplebashoperatorthiswillskip.41b60a9693b2463f97af35b72a32b082 0/1 Completed 0 27s
和我的网站 Ui 状态 my web ui
我想知道为什么这个同步不正确...而且我无法获取 worker pod 日志...
问题出在您使用的气流 Docker 图片上。
我看到的 ENTRYPOINT
是您编写的自定义 .sh
文件,它决定 运行 网络服务器还是调度程序。
Airflow scheduler 为带有 args 的任务提交一个 pod,如下所示
containers:
- args:
- airflow
- tasks
- run
- hello-world-dag
- print_date_dag
- scheduled__2022-03-13T00:00:00+00:00
- --local
- --subdir
- DAGS_FOLDER/sample_dags/hello_world.py
您正在使用的气流图像不知道如何处理此命令,因为 .sh
文件没有处理此命令的案例,因为第一个参数不是 scheduler
或 webserver
.
这使得调度程序提交 pod 并且它会退出而不做任何事情,因为 pod 没有 运行 任何东西。因此,就 k8s 而言,它是成功的,气流不知道状态。因此它将气流元数据数据库上的任务 table 更新为该任务的 None 状态。
请使用以下入口点文件使其工作
#!/usr/bin/env bash
# launch the appropriate process
if [ "" = "webserver" ]
then
exec airflow webserver
elif [ "" = "scheduler" ]
then
exec airflow scheduler
else
exec "$@"
fi