Airflow 2.0.1/Python 3.7.9 自定义挂钩的 ModuleNotFoundError

Airflow 2.0.1/Python 3.7.9 ModuleNotFoundError for custom hook

我的原生气流构建中有如下结构

dags/cust_dag.py dags/jhook.py --contains class UtilTriggers 下有多个方法

在 cust_dag 代码中,我将 hook/module 称为:

从 jhook 导入 UtilTriggers 作为触发器

当我检查 Airflow UI 时,由于 cust_dag 提到错误,我遇到了问题 ModuleNotFoundError:没有名为 jhook

的模块

相同类型的代码在 composer 1.9 上运行,目前我在 运行 本机气流上。

我也尝试添加 init.py 文件,并创建了一个新文件夹 job_trigger,我在该文件夹下添加了该文件,但仍然无法正常工作。

我已经尝试过这个问题中提到的解决方案

即在挂钩自定义模块和 dag 文件中添加以下代码行 导入系统 sys.path.insert(0,os.path.abspath(os.path.dirname(文件)))

如果一切正常,请指导我出现此 ModuleNotFound 错误的原因。

根据您的评论,您收到的错误消息是“导入错误”,看来问题仅与 python 有关。

dag1.py

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

from new1 import hi as h1

default_dag_args = {
  # The start_date describes when a DAG is valid / can be run. Set this to a
  # fixed point in time rather than dynamically, since it is evaluated every
  # time a DAG is parsed. See:
  # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
  'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
      'demo_run',
      schedule_interval=datetime.timedelta(days=1),
      default_args=default_dag_args) as dag:

  # An instance of an operator is called a task. In this case, the
  # hello_python task calls the "greeting" Python function.
  hello_python = python_operator.PythonOperator(
      task_id='hello_world',
      python_callable=h1.funt,
      op_kwargs={"x" : "python"})

  # Likewise, the goodbye_bash task calls a Bash script.
  goodbye_bash = bash_operator.BashOperator(
      task_id='bye',
      bash_command='echo Goodbye.')


new1.py


class hi:
   @staticmethod
   def funt(x):
       return x + " is a programming language"


  1. 由于您将所有方法都用作静态方法,因此无需将 self 传递给您的 method.The 启动方法中的 self 关键字引用该对象。因为静态方法可以在不创建对象的情况下调用,所以它们没有 self 关键字。
  2. 如果您在方法中传递一些参数,请通过提供 op_args and op_kwargs arguments.
  3. 确保这些参数也传递给 DAG 任务

如果这将是一个 kubernetes 问题,请回答您的问题,因为它托管在那里。 此问题与 kubernetes 无关,因为

  • 当我们创建 Composer 环境时,Composer 服务会为每个环境创建一个 GKE 集群。集群是自动命名和标记的,不应由用户手动删除。集群是通过 Deployment Manager 创建和管理的。
  • 如果集群被删除,那么环境将无法修复,需要重新创建。 Kubernetes 错误将类似于 “Http 错误状态代码:400 Http 错误消息:BAD REQUEST”