Airflow DockerOperator 找不到某些图像但可以找到其他图像

Airflow DockerOperator cannot find some images but can find others

尝试在 Airflow 中使用 Docker 运算符时出现以下错误。气流设置对我来说是不可见的(它是由另一个团队在我无法访问的机器上设置的 运行ning 并且负责的团队没有响应)。我从自己编写的 docker 文件创建了 docker 图像。名称 cmprod 指的是 docker 图像。

ImageNotFound: 404 Client Error: Not Found ("pull access denied for cmprod, repository does not exist or may require 'docker login': denied: requested access to the resource is denied")

我不熟悉 docker 登录的使用,我不确定它是否适用于这种情况,因为我可以 运行 某些图像而不是其他图像。 起初我虽然错误地输入了 docker 图像的名称,但我检查并仔细检查了。下面是 docker images 的输出。我能够 运行 通过气流成功地对图像进行测试。

REPOSITORY               TAG                 IMAGE ID            CREATED             SIZE
cm_prod                  latest              08f408557eb7        15 hours ago        2.12GB
cmprod                   latest              08f408557eb7        15 hours ago        2.12GB
<none>                   <none>              4af8c991ea19        15 hours ago        730MB
<none>                   <none>              9da4759a3316        15 hours ago        64.2MB
condatest                latest              e24563f9bb48        5 days ago          2.12GB

我以为我可能错误地使用了 docker 运算符,但我可以 运行 一些其他图像。我认为可能存在气流配置问题,其中不允许某些操作系统或 运行 不允许某些权限,但我一直无法找到任何关于这是否可能的文档。

我的测试没有显示上述任何因素来确定 docker 图像是否可以使用 docker 运算符通过气流找到。这个问题似乎不适合反复试验。任何关于可能发生的事情的建议将不胜感激。

我能够在我的浏览器中看到气流 UI 并触发 dags,并且有一个共享目录,我可以在其中转储我的 dag 规范脚本。气流版本:1.10.3.

docker 的版本信息如下 docker version:

Client: Docker Engine - Community
 Version:           19.03.6
 API version:       1.40
 Go version:        go1.12.16
 Git commit:        369ce74a3c
 Built:             Thu Feb 13 01:29:29 2020
 OS/Arch:           linux/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.6
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.12.16
  Git commit:       369ce74a3c
  Built:            Thu Feb 13 01:28:07 2020
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          1.2.10
  GitCommit:        b34a5c8af56e510852c35414db4c1f4fa6172339
 runc:
  Version:          1.0.0-rc8+dev
  GitCommit:        3e425f80a8c931f88e6d94a8c831b9d5aa481657
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683

编辑: 请求了气流 DAG 代码。我对 post 整个事情犹豫不决,因为我从一个离开的团队成员那里继承了一些代码,我觉得 dag 中的一些代码最好作为一个单独的脚本来实现。以下是最相关的代码块。让我知道是否缺少任何内容。为了清晰起见,我省略了这些块之间的部分,但如果似乎没有任何效果,可以包括在内。

代码块 1:导入依赖项

from functools import reduce
import os, os.path
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.mssql_operator import MsSqlOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.utils.helpers import chain

代码块 2:DAG 和 OPERATOR 实例化

# create SQL operators
def create_SQL_operator(taskfile, dag):
    """
    Creates a MsSQL operator for a given DAG.
    """
    op = MsSqlOperator(
        task_id=taskfile,
        sql=readSQL(os.path.join(ProjDir, taskfile)),
        mssql_conn_id='clarity',
        autocommit=True,
        database='clarity',
        dag=dag
        )
    return op

# Airflow arguments
default_args = {
    'owner': 'airflow',
    'description': 'Parallel SQL DAG',
    'depend_on_past': False,
    'start_date': datetime(2020, 1, 1),
    'email': ['*PERSONTOEMAIL*'],
    'email_on_failure': False,
    'email_on_retry': True
}

# DAG definition
DAG = DAG(ProjName + '_and_infer',
          description='Running parallel SQLs for project: {} and inference on the data'.format(ProjName),
          default_args=default_args,
          schedule_interval=CronTime,   # '0 */2 * * *',  #every 2 hours
          concurrency=50,               # setup to allow 50 concurrent parallel tasks
          catchup=False)
t_predict = DockerOperator(
        task_id='dockerPredict',
        image='cmprod',
        api_version='auto',
        auto_remove=True,
        volumes=['*ABSOLUTEPATHTOMOUNT*:/ds-cm'],
        command='bash inference.sh ',
        docker_url='unix://var/run/docker.sock',
        network_mode='bridge',
        dag=DAG)

# Create SQL task operators in Airflow global space
ops = []
ops = [(order, create_SQL_operator(taskfile, DAG)) for order, taskfile in sql_rank]
ops.sort(key=lambda tup: tup[0])

# create cluster ops list
from itertools import groupby
from operator import itemgetter
opsList = []
opsList = [[j for i, j in grouper] for order, grouper in groupby(ops, key=itemgetter(0))]

# flatten list with only 1 element: Airflow chain() cannot accept list of lists!!
chainList = []
chainList = [reduce(plus, list) if len(list) == 1 else list for list in opsList]
chainList.append(t_predict)

# create final DAG graph
exec(r' >> '.join([r'chainList['+str(i)+r']' for i in range(len(chainList))]))

更新 因为我最初 post 编辑了这个问题,所以我将 condatest 图像替换到上面的代码中,并设法以不同的方式出错:挂载目录中缺少 shell 脚本。

当我复制丢失的文件并再次 运行 时,airflow 无法再找到最新的图像。查看了一下,新复制的脚本没有执行权限,就加了权限。 Airflow 仍然找不到以前工作的 docker 容器。

我删除了shell脚本,airflow又可以找到容器了。这是否意味着问题与 Linux 权限有关?我不清楚安装的驱动器的内容如何影响气流检测容器的能力。此外,我知道我能够 运行 使用 docker 容器在过去由气流中的 docker 对象启动的相同脚本。

将 airflow 升级到 airflow2 后,日志提供了一些额外的信息。 Airflow 已在多台服务器上配置为 运行,并且已在每台服务器上设置 docker,但未使用图像注册表。似乎当作业调度程序试图在我构建 docker 图像的服务器以外的服务器上执行 dag 时,该图像不可用。看来我之前找到的解决方法恰逢我的工作安排在哪个服务器上的幸运抽奖。

为了解决这个问题,我们将调度程序配置为仅使用一台服务器。