我们如何在 Airflow 中导入 dags?

How do we import dags in Airflow?

大家好,我是 Airflow 的新手,我正在尝试将我自己的自定义 jar 导入为使用 Talend Open Studio BigData 生成的 DAG,当我通过终端导入我的 DAG 时遇到了一些麻烦, 没有显示错误,我的 DAG 没有添加到 Airflow UI

中的 DAG 列表中

这是我的 .py 文件代码:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
from airflow.utils.email import send_email

import os
import sys


bib_app = "/home/user/Docs/JObforAirflow/test/test_run.sh"
default_args = {
    'owner': 'yabid',
    'depends_on_past': False,
    'start_date': datetime(2019, 4, 29),
    'email': ['user@user.com'],
    'email_on_failure': True,
    'email_on_success': True,
    'provide_context': True    }

args = {
  'owner': 'yabid'
  ,'email': ['user@user.com']
  ,'start_date': datetime(2019, 4, 25)
  , 'provide_context': True    }

dag = DAG('run_jar', default_args=default_args)

t1 = BashOperator(
    task_id='dependency',
    bash_command= bib_app,
    dag=dag)


t2 = BashOperator(
 task_id = 't2',
 dag = dag,
 bash_command = 'java -cp /home/user/Docs/JObforAirflow/test/jobbatch.jar'
 )

t1.set_upstream(t2)

您是否将此 DAG 文件复制到 ~/airflow/dags

您所有的 *.py 文件都需要复制到 AIRFLOW_HOME/dags,其中 AIRFLOW_HOME=~/airflow

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
from airflow.utils.email import send_email

import os
import sys


bib_app = "/home/user/Docs/JObforAirflow/test/test_run.sh"
default_args = {
    'owner': 'yabid',
    'depends_on_past': False,
    'start_date': datetime(2019, 4, 25),
    'email': ['user@user.com'],
    'email_on_failure': True,
    'email_on_success': True,
    'provide_context': True
}


dag = DAG('run_jar', default_args=default_args)

t1 = BashOperator(
    task_id='dependency',
    bash_command= bib_app,
    dag=dag)


t2 = BashOperator(
    task_id = 't2',
    dag = dag,
    bash_command = 'java -cp /home/user/Docs/JObforAirflow/test/jobbatch.jar')

t1 >> t2
  1. 您有 'email': ['user@user.com], 行包含非闭合字符串:'user@user.com。如果您尝试 运行 Airflow 中的此代码,DAG 将失败。
  2. 如另一个答案中所述,您应该将所有 DAG 放在 AIRFLOW_HOME/dags 文件夹中。添加新的 DAG 文件后,我建议您重新启动 airflow-schedulerairflow-webserver