Airflow 调度程序无法连接到 Kubernetes 服务 api

Airflow scheduler can not connect to Kubernetes service api

我正在尝试使用 Kubernetes 执行程序设置气流,在调度程序容器启动时它会挂起一段时间,然后我收到如下 https 超时错误。消息中的 IP 地址是正确的,在容器内我可以 运行 curl kubernetes:443curl 10.96.0.1:443nc -zv 10.96.0.1 443 所以我假设没有防火墙或阻止访问。

我正在使用本地 kubernetes 以及 aws EKS 但同样的错误,我可以看到 ip 在不同集群中发生变化。

我查看了 google 以找到解决方案,但没有看到类似的案例。

│   File "/usr/local/lib/python3.6/site-packages/airflow/contrib/executors/kubernetes_executor.py", line 335, in run                                                                                                                                                                                                                               │
│     self.worker_uuid, self.kube_config)                                                                                                                                                                                                                                                                                                          │
│   File "/usr/local/lib/python3.6/site-packages/airflow/contrib/executors/kubernetes_executor.py", line 359, in _run                                                                                                                                                                                                                              │
│     **kwargs):                                                                                                                                                                                                                                                                                                                                   │
│   File "/usr/local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 144, in stream                                                                                                                                                                                                                                                   │
│     for line in iter_resp_lines(resp):                                                                                                                                                                                                                                                                                                           │
│   File "/usr/local/lib/python3.6/site-packages/kubernetes/watch/watch.py", line 48, in iter_resp_lines                                                                                                                                                                                                                                           │
│     for seg in resp.read_chunked(decode_content=False):                                                                                                                                                                                                                                                                                          │
│   File "/usr/local/lib/python3.6/site-packages/urllib3/response.py", line 781, in read_chunked                                                                                                                                                                                                                                                   │
│     self._original_response.close()                                                                                                                                                                                                                                                                                                              │
│   File "/usr/local/lib/python3.6/contextlib.py", line 99, in __exit__                                                                                                                                                                                                                                                                            │
│     self.gen.throw(type, value, traceback)                                                                                                                                                                                                                                                                                                       │
│   File "/usr/local/lib/python3.6/site-packages/urllib3/response.py", line 430, in _error_catcher                                                                                                                                                                                                                                                 │
│     raise ReadTimeoutError(self._pool, None, "Read timed out.")                                                                                                                                                                                                                                                                                  │
│ urllib3.exceptions.ReadTimeoutError: HTTPSConnectionPool(host='10.96.0.1', port=443): Read timed out.

更新:我发现了我的问题,但还没有解决方案。 https://github.com/kubernetes-client/python/issues/990

有一个选项可以通过 ENV 变量设置值。在您的 charts/airflow.yaml 文件中,您可以按如下方式设置变量,这应该可以解决您的问题,

AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: {"_request_timeout" : [50, 50]}

airflow.yaml完整代码

airflow:
  image:
     repository: airflow-docker-local
     tag: 1
  executor: Kubernetes
  service:
    type: LoadBalancer
  config:
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://postgres:airflow@airflow-postgresql:5432/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://postgres:airflow@airflow-postgresql:5432/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:airflow@airflow-redis-master:6379/0
    AIRFLOW__CORE__REMOTE_LOGGING: True
    AIRFLOW__CORE__REMOTE_LOG_CONN_ID: my_s3_connection
    AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: s3://xxx-airflow/logs
    AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 25
    AIRFLOW__CORE__LOAD_EXAMPLES: True
    AIRFLOW__WEBSERVER__EXPOSE_CONFIG: True
    AIRFLOW__CORE__FERNET_KEY: -xyz=
    AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: airflow-docker-local
    AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: 1
    AIRFLOW__KUBERNETES__WORKER_CONTAINER_IMAGE_PULL_POLICY: Never
    AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME: airflow
    AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM: airflow
    AIRFLOW__KUBERNETES__NAMESPACE: airflow
    AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: {"_request_timeout" : [50, 50]}


persistence:
  enabled: true
  existingClaim: ''

workers:
  enabled: true

postgresql:
  enabled: true

redis:
  enabled: true