损坏的 DAG:(...) 没有名为 docker 的模块

Broken DAG: (...) No module named docker

我所有 运行 都有 BigQuery 连接器,但我在 Docker 容器中有一些现有脚本,我希望在 Cloud Composer 而不是 App Engine Flexible 上安排。

我有以下脚本,似乎遵循了我能找到的示例:

import datetime
from airflow import DAG
from airflow import models
from airflow.operators.docker_operator import DockerOperator

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_args = {
    # Setting start date as yesterday starts the DAG immediately
    'start_date': yesterday,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
}

schedule_interval = '45 09 * * *'

dag = DAG('xxx-merge', default_args=default_args, schedule_interval=schedule_interval)

hfan = DockerOperator(
   task_id = 'hfan',
   image   = 'gcr.io/yyyyy/xxxx'
 )

...但是当尝试 运行 它在网络上告诉我 UI:

Broken DAG: [/home/airflow/gcs/dags/xxxx.py] No module named docker

是否可能 Docker 未配置为在 Cloud Composer 运行s 的 Kubernetes 集群中工作?或者我只是在语法中遗漏了什么?

这意味着:无论您的 Airflow 实例安装在哪里,都缺少名为 docker 的 Python 包。

如果我配置我的个人机器,我可以使用

安装缺少的包
pip install docker

编辑

在 docker 组件的源代码中 https://airflow.incubator.apache.org/_modules/airflow/operators/docker_operator.html

有一个导入语句:

from docker import Client, tls

所以在我看来,新错误 cannot import name Client 与损坏的安装或错误版本的 docker 软件包有关。

如 tobi6 的回答所述,您需要在 Composer 环境中安装 docker 的 PyPI 包。有说明 here 在您的环境中以特定包版本安装 PyPI 包。

我通过在 composer 的 PyPI 部分安装 docker-py==1.10.6 解决了这个问题。

但是,要让 DockerOperator 正常工作需要更多的努力,因为作曲家工作人员无法访问 Docker 守护程序。前往 GCP 控制台并执行以下步骤;在获得 cluster credentials).

之后
  1. 将当前部署配置导出到文件

    kubectl get deployment airflow-worker -o yaml --export > airflow-worker-config.yaml

  2. 编辑 airflow-worker-config.yaml (example link) 以挂载 docker.sock 和 docker,将 airflow-worker 的特权访问权限授予 运行 docker 命令

  3. 应用部署设置

    kubectl apply -f airflow-worker-config.yaml

如其他答案中所述,Docker Python 客户端未预安装在 Cloud Composer 环境中。要安装它,请将其添加为环境配置中的 PyPI 依赖项。

警告:默认情况下,DockerOperator 将尝试与 /var/run/docker.sock 的 Docker API 对话以管理容器.此套接字 安装在 Composer Airflow worker pods 内,手动配置它 不推荐 Use of DockerOperator is only recommended in Composer if configured to talk to Docker daemons running outside of your environments.

为了避免更脆弱的配置或绕过 Kubernetes 的意外(因为它负责管理整个集群中的容器),您应该使用 KubernetesPodOperator。如果您要将容器启动到 GKE 集群(或 Composer 环境的集群)中,则可以使用 GKEPodOperator,它具有更具体的 GCP-related 参数。

解决我的问题的方法是在 Dockerfile

中添加单词“docker”
&& pip install pyasn1 \
&& pip install apache-airflow[crypto,docker,celery,postgres,hive,jdbc,mysql,ssh${AIRFLOW_DEPS:+,}${AIRFLOW_DEPS}]==${AIRFLOW_VERSION} \
&& pip install 'redis==3.2' \