无法执行 Airflow KubernetesExecutor
Unable to execute Airflow KubernetesExecutor
在 here 的项目之后,我正在尝试使用 NFS 服务器作为后备存储 PV 来集成 airflow kubernetes 执行器。我有一个与 NFS 服务器链接的 PV airflow-pv
。 Airflow 网络服务器和调度程序正在使用与 airflow-pv
绑定的 PVC airflow-pvc
。我已将我的 dag 文件放在 NFS 服务器 /var/nfs/airflow/development/<dags/logs>
中。我也可以在网络服务器 UI 中看到新添加的 DAGS。但是,当我从 UI 执行 DAG 时,调度程序会为该任务触发一个新的 POD,但新的 worker pod 无法 运行 说
Unable to mount volumes for pod "tutorialprintdate-3e1a4443363e4c9f81fd63438cdb9873_development(976b1e64-b46d-11e9-92af-025000000001)": timeout expired waiting for volumes to attach or mount for pod "development"/"tutorialprintdate-3e1a4443363e4c9f81fd63438cdb9873". list of unmounted volumes=[airflow-dags]. list of unattached volumes=[airflow-dags airflow-logs airflow-config default-token-hjwth]
这是我的网络服务器和调度程序部署文件;
apiVersion: v1
kind: Service
metadata:
name: airflow-webserver-svc
namespace: development
spec:
type: NodePort
ports:
- name: web
protocol: TCP
port: 8080
selector:
app: airflow-webserver-app
namespace: development
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-webserver-dep
namespace: development
spec:
replicas: 1
selector:
matchLabels:
app: airflow-webserver-app
namespace: development
template:
metadata:
labels:
app: airflow-webserver-app
namespace: development
spec:
restartPolicy: Always
containers:
- name: airflow-webserver-app
image: airflow:externalConfigs
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8080
args: ["-webserver"]
env:
- name: AIRFLOW_KUBE_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: airflow-secrets
key: AIRFLOW__CORE__FERNET_KEY
- name: MYSQL_ROOT_PASSWORD
valueFrom:
secretKeyRef:
name: airflow-secrets
key: MYSQL_PASSWORD
- name: MYSQL_PASSWORD
valueFrom:
secretKeyRef:
name: airflow-secrets
key: MYSQL_PASSWORD
- name: DB_HOST
value: mysql-svc.development.svc.cluster.local
- name: DB_PORT
value: "3306"
- name: MYSQL_DATABASE
value: airflow
- name: MYSQL_USER
value: airflow
- name: MYSQL_PASSWORD
value: airflow
- name: AIRFLOW__CORE__EXECUTOR
value: "KubernetesExecutor"
volumeMounts:
- name: airflow-config
mountPath: /usr/local/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-files
mountPath: /usr/local/airflow/dags
subPath: airflow/development/dags
- name: airflow-files
mountPath: /usr/local/airflow/plugins
subPath: airflow/development/plugins
- name: airflow-files
mountPath: /usr/local/airflow/logs
subPath: airflow/development/logs
- name: airflow-files
mountPath: /usr/local/airflow/temp
subPath: airflow/development/temp
volumes:
- name: airflow-files
persistentVolumeClaim:
claimName: airflow-pvc
- name: airflow-config
configMap:
name: airflow-config
调度程序 yaml 文件完全相同,只是容器参数是 args: ["-scheduler"]
。这是我的 airflow.cfg 文件,
apiVersion: v1
kind: ConfigMap
metadata:
name: "airflow-config"
namespace: development
data:
airflow.cfg: |
[core]
airflow_home = /usr/local/airflow
dags_folder = /usr/local/airflow/dags
base_log_folder = /usr/local/airflow/logs
executor = KubernetesExecutor
plugins_folder = /usr/local/airflow/plugins
load_examples = false
[scheduler]
child_process_log_directory = /usr/local/airflow/logs/scheduler
[webserver]
rbac = false
[kubernetes]
airflow_configmap =
worker_container_repository = airflow
worker_container_tag = externalConfigs
worker_container_image_pull_policy = IfNotPresent
delete_worker_pods = true
dags_volume_claim = airflow-pvc
dags_volume_subpath =
logs_volume_claim = airflow-pvc
logs_volume_subpath =
env_from_configmap_ref = airflow-config
env_from_secret_ref = airflow-secrets
in_cluster = true
namespace = development
[kubernetes_node_selectors]
# the key-value pairs to be given to worker pods.
# the worker pods will be scheduled to the nodes of the specified key-value pairs.
# should be supplied in the format: key = value
[kubernetes_environment_variables]
//the below configs gets overwritten by above [kubernetes] configs
AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM = airflow-pvc
AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH = var/nfs/airflow/development/dags
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM = airflow-pvc
AIRFLOW__KUBERNETES__LOGS_VOLUME_SUBPATH = var/nfs/airflow/development/logs
[kubernetes_secrets]
AIRFLOW__CORE__SQL_ALCHEMY_CONN = airflow-secrets=AIRFLOW__CORE__SQL_ALCHEMY_CONN
AIRFLOW_HOME = airflow-secrets=AIRFLOW_HOME
[cli]
api_client = airflow.api.client.json_client
endpoint_url = https://airflow.crunchanalytics.cloud
[api]
auth_backend = airflow.api.auth.backend.default
[admin]
# ui to hide sensitive variable fields when set to true
hide_sensitive_variable_fields = true
触发手动任务后,调度程序的日志告诉我 KubernetesExecutorConfig() 执行时所有值都为 None。好像它没有获取配置?我已经尝试了几乎我所知道的一切,但无法让它发挥作用。有人能告诉我我想念什么吗?
[2019-08-01 14:44:22,944] {jobs.py:1341} INFO - Sending ('kubernetes_sample', 'run_this_first', datetime.datetime(2019, 8, 1, 13, 45, 51, 874679, tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
[2019-08-01 14:44:22,944] {base_executor.py:56} INFO - Adding to queue: airflow run kubernetes_sample run_this_first 2019-08-01T13:45:51.874679+00:00 --local -sd /usr/local/airflow/dags/airflow/development/dags/k8s_dag.py
[2019-08-01 14:44:22,948] {kubernetes_executor.py:629} INFO - Add task ('kubernetes_sample', 'run_this_first', datetime.datetime(2019, 8, 1, 13, 45, 51, 874679, tzinfo=<Timezone [UTC]>), 1) with command airflow run kubernetes_sample run_this_first 2019-08-01T13:45:51.874679+00:00 --local -sd /usr/local/airflow/dags/airflow/development/dags/k8s_dag.py with executor_config {}
[2019-08-01 14:44:22,949] {kubernetes_executor.py:379} INFO - Kubernetes job is (('kubernetes_sample', 'run_this_first', datetime.datetime(2019, 8, 1, 13, 45, 51, 874679, tzinfo=<Timezone [UTC]>), 1), 'airflow run kubernetes_sample run_this_first 2019-08-01T13:45:51.874679+00:00 --local -sd /usr/local/airflow/dags/airflow/development/dags/k8s_dag.py', KubernetesExecutorConfig(image=None, image_pull_policy=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None, gcp_service_account_key=None, node_selectors=None, affinity=None, annotations={}, volumes=[], volume_mounts=[], tolerations=None))
[2019-08-01 14:44:23,042] {kubernetes_executor.py:292} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 had an event of type ADDED
[2019-08-01 14:44:23,046] {kubernetes_executor.py:324} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 Pending
[2019-08-01 14:44:23,049] {kubernetes_executor.py:292} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 had an event of type MODIFIED
[2019-08-01 14:44:23,049] {kubernetes_executor.py:324} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 Pending
供参考,这是我的PV和PVC;
kind: PersistentVolume
apiVersion: v1
metadata:
name: airflow-pv
labels:
mode: local
environment: development
spec:
persistentVolumeReclaimPolicy: Retain
storageClassName: airflow-pv
capacity:
storage: 4Gi
accessModes:
- ReadWriteMany
nfs:
server: 10.105.225.217
path: "/"
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: airflow-pvc
namespace: development
spec:
storageClassName: airflow-pv
accessModes:
- ReadWriteMany
resources:
requests:
storage: 1Gi
selector:
matchLabels:
mode: local
environment: development
使用 Airflow 版本:1.10.3
由于还没有答案,我将分享我目前的发现。在 kubernetes
部分下的 airflow.conf 中,我们将传递以下值
dags_volume_claim = airflow-pvc
dags_volume_subpath = airflow/development/dags
logs_volume_claim = airflow-pvc
logs_volume_subpath = airflow/development/logs
调度程序如何从上述配置创建新 pod 的方式如下(仅提及卷和 volumeMounts);
"volumes": [
{
"name": "airflow-dags",
"persistentVolumeClaim": {
"claimName": "airflow-pvc"
}
},
{
"name": "airflow-logs",
"persistentVolumeClaim": {
"claimName": "airflow-pvc"
}
}],
"containers": [
{ ...
"volumeMounts": [
{
"name": "airflow-dags",
"readOnly": true,
"mountPath": "/usr/local/airflow/dags",
"subPath": "airflow/development/dags"
},
{
"name": "airflow-logs",
"mountPath": "/usr/local/airflow/logs",
"subPath": "airflow/development/logs"
}]
...}]
K8s 不喜欢指向同一个 pvc (airflow-pvc) 的多个卷。要解决此问题,我会为 dags 和日志 dags_volume_claim = airflow-dags-pvc
和 logs_volume_claim = airflow-log-pvc
创建两个 PVC(和 PV),效果很好。
我不知道新版本的气流是否已经解决了这个问题(我使用的是 1.10.3)。当人们使用相同的 PVC 然后创建一个具有单个卷和 2 个引用该卷的 volumeMounts 的 pod 时,气流调度程序应该处理这种情况,例如
"volumes": [
{
"name": "airflow-dags-logs", <--just an example name
"persistentVolumeClaim": {
"claimName": "airflow-pvc"
}
}
"containers": [
{ ...
"volumeMounts": [
{
"name": "airflow-dags-logs",
"readOnly": true,
"mountPath": "/usr/local/airflow/dags",
"subPath": "airflow/development/dags" <--taken from configs
},
{
"name": "airflow-dags-logs",
"mountPath": "/usr/local/airflow/logs",
"subPath": "airflow/development/logs" <--taken from configs
}]
...}]
我用上述配置部署了一个 pod,它工作正常!
在 here 的项目之后,我正在尝试使用 NFS 服务器作为后备存储 PV 来集成 airflow kubernetes 执行器。我有一个与 NFS 服务器链接的 PV airflow-pv
。 Airflow 网络服务器和调度程序正在使用与 airflow-pv
绑定的 PVC airflow-pvc
。我已将我的 dag 文件放在 NFS 服务器 /var/nfs/airflow/development/<dags/logs>
中。我也可以在网络服务器 UI 中看到新添加的 DAGS。但是,当我从 UI 执行 DAG 时,调度程序会为该任务触发一个新的 POD,但新的 worker pod 无法 运行 说
Unable to mount volumes for pod "tutorialprintdate-3e1a4443363e4c9f81fd63438cdb9873_development(976b1e64-b46d-11e9-92af-025000000001)": timeout expired waiting for volumes to attach or mount for pod "development"/"tutorialprintdate-3e1a4443363e4c9f81fd63438cdb9873". list of unmounted volumes=[airflow-dags]. list of unattached volumes=[airflow-dags airflow-logs airflow-config default-token-hjwth]
这是我的网络服务器和调度程序部署文件;
apiVersion: v1
kind: Service
metadata:
name: airflow-webserver-svc
namespace: development
spec:
type: NodePort
ports:
- name: web
protocol: TCP
port: 8080
selector:
app: airflow-webserver-app
namespace: development
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-webserver-dep
namespace: development
spec:
replicas: 1
selector:
matchLabels:
app: airflow-webserver-app
namespace: development
template:
metadata:
labels:
app: airflow-webserver-app
namespace: development
spec:
restartPolicy: Always
containers:
- name: airflow-webserver-app
image: airflow:externalConfigs
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8080
args: ["-webserver"]
env:
- name: AIRFLOW_KUBE_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: airflow-secrets
key: AIRFLOW__CORE__FERNET_KEY
- name: MYSQL_ROOT_PASSWORD
valueFrom:
secretKeyRef:
name: airflow-secrets
key: MYSQL_PASSWORD
- name: MYSQL_PASSWORD
valueFrom:
secretKeyRef:
name: airflow-secrets
key: MYSQL_PASSWORD
- name: DB_HOST
value: mysql-svc.development.svc.cluster.local
- name: DB_PORT
value: "3306"
- name: MYSQL_DATABASE
value: airflow
- name: MYSQL_USER
value: airflow
- name: MYSQL_PASSWORD
value: airflow
- name: AIRFLOW__CORE__EXECUTOR
value: "KubernetesExecutor"
volumeMounts:
- name: airflow-config
mountPath: /usr/local/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-files
mountPath: /usr/local/airflow/dags
subPath: airflow/development/dags
- name: airflow-files
mountPath: /usr/local/airflow/plugins
subPath: airflow/development/plugins
- name: airflow-files
mountPath: /usr/local/airflow/logs
subPath: airflow/development/logs
- name: airflow-files
mountPath: /usr/local/airflow/temp
subPath: airflow/development/temp
volumes:
- name: airflow-files
persistentVolumeClaim:
claimName: airflow-pvc
- name: airflow-config
configMap:
name: airflow-config
调度程序 yaml 文件完全相同,只是容器参数是 args: ["-scheduler"]
。这是我的 airflow.cfg 文件,
apiVersion: v1
kind: ConfigMap
metadata:
name: "airflow-config"
namespace: development
data:
airflow.cfg: |
[core]
airflow_home = /usr/local/airflow
dags_folder = /usr/local/airflow/dags
base_log_folder = /usr/local/airflow/logs
executor = KubernetesExecutor
plugins_folder = /usr/local/airflow/plugins
load_examples = false
[scheduler]
child_process_log_directory = /usr/local/airflow/logs/scheduler
[webserver]
rbac = false
[kubernetes]
airflow_configmap =
worker_container_repository = airflow
worker_container_tag = externalConfigs
worker_container_image_pull_policy = IfNotPresent
delete_worker_pods = true
dags_volume_claim = airflow-pvc
dags_volume_subpath =
logs_volume_claim = airflow-pvc
logs_volume_subpath =
env_from_configmap_ref = airflow-config
env_from_secret_ref = airflow-secrets
in_cluster = true
namespace = development
[kubernetes_node_selectors]
# the key-value pairs to be given to worker pods.
# the worker pods will be scheduled to the nodes of the specified key-value pairs.
# should be supplied in the format: key = value
[kubernetes_environment_variables]
//the below configs gets overwritten by above [kubernetes] configs
AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM = airflow-pvc
AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH = var/nfs/airflow/development/dags
AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM = airflow-pvc
AIRFLOW__KUBERNETES__LOGS_VOLUME_SUBPATH = var/nfs/airflow/development/logs
[kubernetes_secrets]
AIRFLOW__CORE__SQL_ALCHEMY_CONN = airflow-secrets=AIRFLOW__CORE__SQL_ALCHEMY_CONN
AIRFLOW_HOME = airflow-secrets=AIRFLOW_HOME
[cli]
api_client = airflow.api.client.json_client
endpoint_url = https://airflow.crunchanalytics.cloud
[api]
auth_backend = airflow.api.auth.backend.default
[admin]
# ui to hide sensitive variable fields when set to true
hide_sensitive_variable_fields = true
触发手动任务后,调度程序的日志告诉我 KubernetesExecutorConfig() 执行时所有值都为 None。好像它没有获取配置?我已经尝试了几乎我所知道的一切,但无法让它发挥作用。有人能告诉我我想念什么吗?
[2019-08-01 14:44:22,944] {jobs.py:1341} INFO - Sending ('kubernetes_sample', 'run_this_first', datetime.datetime(2019, 8, 1, 13, 45, 51, 874679, tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
[2019-08-01 14:44:22,944] {base_executor.py:56} INFO - Adding to queue: airflow run kubernetes_sample run_this_first 2019-08-01T13:45:51.874679+00:00 --local -sd /usr/local/airflow/dags/airflow/development/dags/k8s_dag.py
[2019-08-01 14:44:22,948] {kubernetes_executor.py:629} INFO - Add task ('kubernetes_sample', 'run_this_first', datetime.datetime(2019, 8, 1, 13, 45, 51, 874679, tzinfo=<Timezone [UTC]>), 1) with command airflow run kubernetes_sample run_this_first 2019-08-01T13:45:51.874679+00:00 --local -sd /usr/local/airflow/dags/airflow/development/dags/k8s_dag.py with executor_config {}
[2019-08-01 14:44:22,949] {kubernetes_executor.py:379} INFO - Kubernetes job is (('kubernetes_sample', 'run_this_first', datetime.datetime(2019, 8, 1, 13, 45, 51, 874679, tzinfo=<Timezone [UTC]>), 1), 'airflow run kubernetes_sample run_this_first 2019-08-01T13:45:51.874679+00:00 --local -sd /usr/local/airflow/dags/airflow/development/dags/k8s_dag.py', KubernetesExecutorConfig(image=None, image_pull_policy=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None, gcp_service_account_key=None, node_selectors=None, affinity=None, annotations={}, volumes=[], volume_mounts=[], tolerations=None))
[2019-08-01 14:44:23,042] {kubernetes_executor.py:292} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 had an event of type ADDED
[2019-08-01 14:44:23,046] {kubernetes_executor.py:324} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 Pending
[2019-08-01 14:44:23,049] {kubernetes_executor.py:292} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 had an event of type MODIFIED
[2019-08-01 14:44:23,049] {kubernetes_executor.py:324} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 Pending
供参考,这是我的PV和PVC;
kind: PersistentVolume
apiVersion: v1
metadata:
name: airflow-pv
labels:
mode: local
environment: development
spec:
persistentVolumeReclaimPolicy: Retain
storageClassName: airflow-pv
capacity:
storage: 4Gi
accessModes:
- ReadWriteMany
nfs:
server: 10.105.225.217
path: "/"
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: airflow-pvc
namespace: development
spec:
storageClassName: airflow-pv
accessModes:
- ReadWriteMany
resources:
requests:
storage: 1Gi
selector:
matchLabels:
mode: local
environment: development
使用 Airflow 版本:1.10.3
由于还没有答案,我将分享我目前的发现。在 kubernetes
部分下的 airflow.conf 中,我们将传递以下值
dags_volume_claim = airflow-pvc
dags_volume_subpath = airflow/development/dags
logs_volume_claim = airflow-pvc
logs_volume_subpath = airflow/development/logs
调度程序如何从上述配置创建新 pod 的方式如下(仅提及卷和 volumeMounts);
"volumes": [
{
"name": "airflow-dags",
"persistentVolumeClaim": {
"claimName": "airflow-pvc"
}
},
{
"name": "airflow-logs",
"persistentVolumeClaim": {
"claimName": "airflow-pvc"
}
}],
"containers": [
{ ...
"volumeMounts": [
{
"name": "airflow-dags",
"readOnly": true,
"mountPath": "/usr/local/airflow/dags",
"subPath": "airflow/development/dags"
},
{
"name": "airflow-logs",
"mountPath": "/usr/local/airflow/logs",
"subPath": "airflow/development/logs"
}]
...}]
K8s 不喜欢指向同一个 pvc (airflow-pvc) 的多个卷。要解决此问题,我会为 dags 和日志 dags_volume_claim = airflow-dags-pvc
和 logs_volume_claim = airflow-log-pvc
创建两个 PVC(和 PV),效果很好。
我不知道新版本的气流是否已经解决了这个问题(我使用的是 1.10.3)。当人们使用相同的 PVC 然后创建一个具有单个卷和 2 个引用该卷的 volumeMounts 的 pod 时,气流调度程序应该处理这种情况,例如
"volumes": [
{
"name": "airflow-dags-logs", <--just an example name
"persistentVolumeClaim": {
"claimName": "airflow-pvc"
}
}
"containers": [
{ ...
"volumeMounts": [
{
"name": "airflow-dags-logs",
"readOnly": true,
"mountPath": "/usr/local/airflow/dags",
"subPath": "airflow/development/dags" <--taken from configs
},
{
"name": "airflow-dags-logs",
"mountPath": "/usr/local/airflow/logs",
"subPath": "airflow/development/logs" <--taken from configs
}]
...}]
我用上述配置部署了一个 pod,它工作正常!