Airflow KubernetesExecutor 和 minikube:调度程序无法连接到 Minikube
Airflow KubernetesExecutor and minikube: Scheduler can't connect to Minikube
我有一个 运行 的 MiniKube,我通过 docker 部署 Airflow - 以这种方式组合:
---
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.3}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: KubernetesExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
# AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ~/.kube:/home/airflow/.kube
- ./dags/:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.1.0
min_airlfow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airlfow_version_comparable )); then
echo -e "3[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo -e "3[1;31mERROR!!!: AIRFLOW_UID not set!\e[0m"
echo "Please follow these instructions to set AIRFLOW_UID and AIRFLOW_GID environment variables:
https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#initializing-environment"
exit 1
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo -e "3[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo -e "3[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo -e "3[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "3[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:${AIRFLOW_GID}" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
user: "0:${AIRFLOW_GID:-0}"
volumes:
- .:/sources
volumes:
postgres-db-volume:
但是 Airflow 和 Kubernetes 之间的连接似乎失败了(删除 AIRFLOW__CORE__EXECUTOR varenv 允许创建):
airflow-scheduler_1 | Traceback (most recent call last):
airflow-scheduler_1 | File "/home/airflow/.local/bin/airflow", line 8, in <module>
airflow-scheduler_1 | sys.exit(main())
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
airflow-scheduler_1 | args.func(args)
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
airflow-scheduler_1 | return func(*args, **kwargs)
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
airflow-scheduler_1 | return f(*args, **kwargs)
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py", line 70, in scheduler
airflow-scheduler_1 | job.run()
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 245, in run
airflow-scheduler_1 | self._execute()
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 686, in _execute
airflow-scheduler_1 | self.executor.start()
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/kubernetes_executor.py", line 485, in start
airflow-scheduler_1 | self.kube_client = get_kube_client()
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/kubernetes/kube_client.py", line 145, in get_kube_client
airflow-scheduler_1 | client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/kubernetes/kube_client.py", line 40, in _get_kube_config
airflow-scheduler_1 | config.load_incluster_config()
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/config/incluster_config.py", line 93, in load_incluster_config
airflow-scheduler_1 | InClusterConfigLoader(token_filename=SERVICE_TOKEN_FILENAME,
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/config/incluster_config.py", line 45, in load_and_set
airflow-scheduler_1 | self._load_config()
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/config/incluster_config.py", line 51, in _load_config
airflow-scheduler_1 | raise ConfigException("Service host/port is not set.")
airflow-scheduler_1 | kubernetes.config.config_exception.ConfigException: Service host/port is not set.
我的想法是 Airflow Scheduler 没有正确找到 kube 配置文件。我安装了卷 ~/.kube:/home/airflow/.kube
但无法找到使其工作的方法。
使用 Docker Compose 到 运行 KubernetesExecutor 似乎是个坏主意。
你为什么要这样做?
使用官方 Helm Chart 更有意义 - 它更易于管理和配置,您可以轻松地将它部署到您的 minikube,并且它可以与 KubernetesExecutor 一起开箱即用。
https://airflow.apache.org/docs/helm-chart/stable/index.html
我有一个 运行 的 MiniKube,我通过 docker 部署 Airflow - 以这种方式组合:
---
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.3}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: KubernetesExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
# AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ~/.kube:/home/airflow/.kube
- ./dags/:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.1.0
min_airlfow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airlfow_version_comparable )); then
echo -e "3[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo -e "3[1;31mERROR!!!: AIRFLOW_UID not set!\e[0m"
echo "Please follow these instructions to set AIRFLOW_UID and AIRFLOW_GID environment variables:
https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#initializing-environment"
exit 1
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo -e "3[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo -e "3[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo -e "3[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "3[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:${AIRFLOW_GID}" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
user: "0:${AIRFLOW_GID:-0}"
volumes:
- .:/sources
volumes:
postgres-db-volume:
但是 Airflow 和 Kubernetes 之间的连接似乎失败了(删除 AIRFLOW__CORE__EXECUTOR varenv 允许创建):
airflow-scheduler_1 | Traceback (most recent call last):
airflow-scheduler_1 | File "/home/airflow/.local/bin/airflow", line 8, in <module>
airflow-scheduler_1 | sys.exit(main())
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
airflow-scheduler_1 | args.func(args)
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
airflow-scheduler_1 | return func(*args, **kwargs)
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
airflow-scheduler_1 | return f(*args, **kwargs)
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py", line 70, in scheduler
airflow-scheduler_1 | job.run()
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 245, in run
airflow-scheduler_1 | self._execute()
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 686, in _execute
airflow-scheduler_1 | self.executor.start()
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/kubernetes_executor.py", line 485, in start
airflow-scheduler_1 | self.kube_client = get_kube_client()
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/kubernetes/kube_client.py", line 145, in get_kube_client
airflow-scheduler_1 | client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/airflow/kubernetes/kube_client.py", line 40, in _get_kube_config
airflow-scheduler_1 | config.load_incluster_config()
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/config/incluster_config.py", line 93, in load_incluster_config
airflow-scheduler_1 | InClusterConfigLoader(token_filename=SERVICE_TOKEN_FILENAME,
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/config/incluster_config.py", line 45, in load_and_set
airflow-scheduler_1 | self._load_config()
airflow-scheduler_1 | File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/config/incluster_config.py", line 51, in _load_config
airflow-scheduler_1 | raise ConfigException("Service host/port is not set.")
airflow-scheduler_1 | kubernetes.config.config_exception.ConfigException: Service host/port is not set.
我的想法是 Airflow Scheduler 没有正确找到 kube 配置文件。我安装了卷 ~/.kube:/home/airflow/.kube
但无法找到使其工作的方法。
使用 Docker Compose 到 运行 KubernetesExecutor 似乎是个坏主意。
你为什么要这样做?
使用官方 Helm Chart 更有意义 - 它更易于管理和配置,您可以轻松地将它部署到您的 minikube,并且它可以与 KubernetesExecutor 一起开箱即用。
https://airflow.apache.org/docs/helm-chart/stable/index.html