DAG 不从控制台触发
DAG does not trigger from the console
我一直在尝试创建示例 DAG。测试在命令行上运行良好,但是当我从控制台触发相同的 DAG 时,它永远处于 运行 状态。这是下面的代码以供参考。帮助将不胜感激。卡在运行状态我真的不明白。当我从控制台手动触发 dag 时,气流调度程序也没有显示任何输出。
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.hooks import HttpHook
from airflow.contrib.operators.ssh_operator import SSHOperator
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'flow',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
node_bash = """
pwd
"""
t1 = SSHOperator(
ssh_conn_id='datalakefs',
task_id='loadfstodb',
command=node_bash,
dag=dag)
t1
这是测试命令的输出。
sudo airflow test flow loadfstodb 28-02-2020
[2020-02-28 10:50:53,602] {settings.py:254} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=13429
[2020-02-28 10:50:54,090] {__init__.py:51} INFO - Using executor LocalExecutor
[2020-02-28 10:50:54,090] {dagbag.py:403} INFO - Filling up the DagBag from /home/airflow/dag
/usr/local/lib/python3.6/dist-packages/airflow/utils/helpers.py:439: DeprecationWarning: Importing 'HttpHook' directly from 'airflow.hooks' has been deprecated. Please import from 'airflow.hooks.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
DeprecationWarning)
/usr/local/lib/python3.6/dist-packages/airflow/utils/helpers.py:439: DeprecationWarning: Importing 'PostgresHook' directly from 'airflow.hooks' has been deprecated. Please import from 'airflow.hooks.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
DeprecationWarning)
[2020-02-28 10:50:54,354] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: flow.loadfstodb 2020-02-28T00:00:00+00:00 [None]>
[2020-02-28 10:50:54,368] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: flow.loadfstodb 2020-02-28T00:00:00+00:00 [None]>
[2020-02-28 10:50:54,369] {taskinstance.py:866} INFO -
--------------------------------------------------------------------------------
[2020-02-28 10:50:54,369] {taskinstance.py:867} INFO - Starting attempt 1 of 2
[2020-02-28 10:50:54,369] {taskinstance.py:868} INFO -
--------------------------------------------------------------------------------
[2020-02-28 10:50:54,371] {taskinstance.py:887} INFO - Executing <Task(SSHOperator): loadfstodb> on 2020-02-28T00:00:00+00:00
[2020-02-28 10:50:54,390] {ssh_operator.py:92} INFO - ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.
[2020-02-28 10:50:54,421] {base_hook.py:84} INFO - Using connection to: id: datalakefs. Host: datalake, Port: None, Schema: None, Login: airflow, Password: XXXXXXXX, extra: XXXXXXXX
[2020-02-28 10:50:54,422] {ssh_hook.py:166} WARNING - Remote Identification Change is not verified. This wont protect against Man-In-The-Middle attacks
[2020-02-28 10:50:54,424] {ssh_hook.py:170} WARNING - No Host Key Verification. This wont protect against Man-In-The-Middle attacks
[2020-02-28 10:50:54,431] {transport.py:1572} INFO - Connected (version 2.0, client OpenSSH_7.6p1)
[2020-02-28 10:50:54,543] {transport.py:1572} INFO - Authentication (publickey) successful!
[2020-02-28 10:50:54,543] {ssh_operator.py:109} INFO - Running command:
pwd
[2020-02-28 10:50:55,760] {ssh_operator.py:143} INFO - /home/airflow
这是因为 airflow 调度程序通过 interval.And 更新它的 dags 它需要一些时间等待调度程序意识到新的 dag 的加入。
并且由于最初的 运行 它被永远卡住了,因为 postgres "job" table 中的一个条目。然后我不得不删除与 Job 相关的特定条目,然后再次启动我的 DAG。它奏效了。
我一直在尝试创建示例 DAG。测试在命令行上运行良好,但是当我从控制台触发相同的 DAG 时,它永远处于 运行 状态。这是下面的代码以供参考。帮助将不胜感激。卡在运行状态我真的不明白。当我从控制台手动触发 dag 时,气流调度程序也没有显示任何输出。
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.hooks import HttpHook
from airflow.contrib.operators.ssh_operator import SSHOperator
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'flow',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
node_bash = """
pwd
"""
t1 = SSHOperator(
ssh_conn_id='datalakefs',
task_id='loadfstodb',
command=node_bash,
dag=dag)
t1
这是测试命令的输出。
sudo airflow test flow loadfstodb 28-02-2020
[2020-02-28 10:50:53,602] {settings.py:254} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=13429
[2020-02-28 10:50:54,090] {__init__.py:51} INFO - Using executor LocalExecutor
[2020-02-28 10:50:54,090] {dagbag.py:403} INFO - Filling up the DagBag from /home/airflow/dag
/usr/local/lib/python3.6/dist-packages/airflow/utils/helpers.py:439: DeprecationWarning: Importing 'HttpHook' directly from 'airflow.hooks' has been deprecated. Please import from 'airflow.hooks.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
DeprecationWarning)
/usr/local/lib/python3.6/dist-packages/airflow/utils/helpers.py:439: DeprecationWarning: Importing 'PostgresHook' directly from 'airflow.hooks' has been deprecated. Please import from 'airflow.hooks.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
DeprecationWarning)
[2020-02-28 10:50:54,354] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: flow.loadfstodb 2020-02-28T00:00:00+00:00 [None]>
[2020-02-28 10:50:54,368] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: flow.loadfstodb 2020-02-28T00:00:00+00:00 [None]>
[2020-02-28 10:50:54,369] {taskinstance.py:866} INFO -
--------------------------------------------------------------------------------
[2020-02-28 10:50:54,369] {taskinstance.py:867} INFO - Starting attempt 1 of 2
[2020-02-28 10:50:54,369] {taskinstance.py:868} INFO -
--------------------------------------------------------------------------------
[2020-02-28 10:50:54,371] {taskinstance.py:887} INFO - Executing <Task(SSHOperator): loadfstodb> on 2020-02-28T00:00:00+00:00
[2020-02-28 10:50:54,390] {ssh_operator.py:92} INFO - ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.
[2020-02-28 10:50:54,421] {base_hook.py:84} INFO - Using connection to: id: datalakefs. Host: datalake, Port: None, Schema: None, Login: airflow, Password: XXXXXXXX, extra: XXXXXXXX
[2020-02-28 10:50:54,422] {ssh_hook.py:166} WARNING - Remote Identification Change is not verified. This wont protect against Man-In-The-Middle attacks
[2020-02-28 10:50:54,424] {ssh_hook.py:170} WARNING - No Host Key Verification. This wont protect against Man-In-The-Middle attacks
[2020-02-28 10:50:54,431] {transport.py:1572} INFO - Connected (version 2.0, client OpenSSH_7.6p1)
[2020-02-28 10:50:54,543] {transport.py:1572} INFO - Authentication (publickey) successful!
[2020-02-28 10:50:54,543] {ssh_operator.py:109} INFO - Running command:
pwd
[2020-02-28 10:50:55,760] {ssh_operator.py:143} INFO - /home/airflow
这是因为 airflow 调度程序通过 interval.And 更新它的 dags 它需要一些时间等待调度程序意识到新的 dag 的加入。
并且由于最初的 运行 它被永远卡住了,因为 postgres "job" table 中的一个条目。然后我不得不删除与 Job 相关的特定条目,然后再次启动我的 DAG。它奏效了。