气流 - 脚本在触发时不执行

Airflow - Script does not execute when triggered

我有一个 airflow 脚本试图将数据从一个 table 插入到另一个,我使用的是 Amazon Redshift 数据库。下面给出的脚本在触发时不执行。 Task_id 状态在图形视图中保持为 'no status',并且未显示其他错误。

## Third party Library Imports

import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 1, 23, 12),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG('sample_dag', default_args=default_args, catchup=False, schedule_interval="@once")


#######################
## Login to DB

def db_login():
    global db_conn
    try:
        db_conn = psycopg2.connect(
        " dbname = 'name' user = 'user' password = 'pass' host = 'host' port = '5439' sslmode = 'require' ")
    except:
        print("I am unable to connect to the database.")
        print('Connection Task Complete: Connected to DB')
        return (db_conn)


#######################

def insert_data():
    cur = db_conn.cursor()
    cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2 ;""")
    db_conn.commit()
    print('ETL Task Complete')

def job_run():
    db_login()
    insert_data()

##########################################

t1 = PythonOperator(
task_id='DBConnect',
python_callable=job_run,
bash_command='python3 ~/airflow/dags/sample.py',
dag=dag)

t1

谁能帮忙找出问题所在。谢谢

更新代码 (05/28)

## Third party Library Imports

import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
#from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 23, 12),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG('sample_dag', default_args=default_args, catchup=False, schedule_interval="@once")


#######################
## Login to DB


def data_warehouse_login():
    global dwh_connection
    try:
        dwh_connection = psycopg2.connect(
        " dbname = 'name' user = 'user' password = 'pass' host = 'host' port = 'port' sslmode = 'require' ")
    except:
        print("Connection Failed.")
        print('Connected successfully')
    return (dwh_connection)

def insert_data():
    cur = dwh_connection.cursor()
    cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2;""")
    dwh_connection.commit()
    print('Task Complete: Insert success')


def job_run():
    data_warehouse_login()
    insert_data()



##########################################

t1 = PythonOperator(
    task_id='DWH_Connect',
    python_callable=job_run(),
    # bash_command='python3 ~/airflow/dags/sample.py',
    dag=dag)

t1

当 运行 脚本

时记录消息
[2018-05-28 11:36:45,300] {jobs.py:343} DagFileProcessor26 INFO - Started process (PID=26489) to work on /Users/user/airflow/dags/sample.py
[2018-05-28 11:36:45,306] {jobs.py:534} DagFileProcessor26 ERROR - Cannot use more than 1 thread when using sqlite. Setting max_threads to 1
[2018-05-28 11:36:45,310] {jobs.py:1521} DagFileProcessor26 INFO - Processing file /Users/user/airflow/dags/sample.py for tasks to queue
[2018-05-28 11:36:45,310] {models.py:167} DagFileProcessor26 INFO - Filling up the DagBag from /Users/user/airflow/dags/sample.py
/Users/user/anaconda3/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
  """)
Task Complete: Insert success
[2018-05-28 11:36:50,964] {jobs.py:1535} DagFileProcessor26 INFO - DAG(s) dict_keys(['latest_only', 'example_python_operator', 'test_utils', 'example_bash_operator', 'example_short_circuit_operator', 'example_branch_operator', 'tutorial', 'example_passing_params_via_test_command', 'latest_only_with_trigger', 'example_xcom', 'example_http_operator', 'example_skip_dag', 'example_trigger_target_dag', 'example_branch_dop_operator_v3', 'example_subdag_operator', 'example_subdag_operator.section-1', 'example_subdag_operator.section-2', 'example_trigger_controller_dag', 'insert_data2']) retrieved from /Users/user/airflow/dags/sample.py
[2018-05-28 11:36:51,159] {jobs.py:1169} DagFileProcessor26 INFO - Processing example_subdag_operator
[2018-05-28 11:36:51,167] {jobs.py:566} DagFileProcessor26 INFO - Skipping SLA check for <DAG: example_subdag_operator> because no tasks in DAG have SLAs
[2018-05-28 11:36:51,170] {jobs.py:1169} DagFileProcessor26 INFO - Processing sample_dag
[2018-05-28 11:36:51,174] {jobs.py:354} DagFileProcessor26 ERROR - Got an exception! Propagating...
Traceback (most recent call last):
  File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 346, in helper
pickle_dags)
  File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
  File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1581, in process_file
self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1171, in _process_dags
dag_run = self.create_dag_run(dag)
  File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
  File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 776, in create_dag_run
if next_start <= now:
TypeError: '<=' not supported between instances of 'NoneType' and 'datetime.datetime'

从图表视图记录

* 日志文件不是本地的。 * 在这里获取:http://:8793/log/sample_dag/DWH_Connect/2018-05-28T12:23:57.595234 *** 无法从 worker 获取日志文件。

* 读取远程日志... * 不支持的远程日志位置。

您需要 BashOperatorPythonOperator

而不是 PythonOperator

您收到错误是因为 PythonOperator 没有 bash_command 参数

t1 = PythonOperator(
    task_id='DBConnect',
    python_callable=db_login,
    dag=dag
    )

t2 = BashOperator(
    task_id='Run Python File',
    bash_command='python3 ~/airflow/dags/sample.py',
    dag=dag
    )

t1 >> t2

对于 kaxil 提供的答案,我想扩展您应该使用 IDE 来开发 Airflow。 PyCharm 对我来说很好。

话虽这么说,请务必在下次查看文档中的可用字段。对于 PythonOperator,请参阅此处的文档:

https://airflow.apache.org/code.html#airflow.operators.PythonOperator

签名看起来像:

class airflow.operators.PythonOperator(python_callable, op_args=None, op_kwargs=None, provide_context=False, templates_dict=None, templates_exts=None, *args, **kwargs)

对于 BashOperator,请参阅此处的文档:

https://airflow.apache.org/code.html#airflow.operators.BashOperator

签名是:

class airflow.operators.BashOperator(bash_command, xcom_push=False, env=None, output_encoding='utf-8', *args, **kwargs)

高亮是我的,用来展示你一直在使用的参数。

我的建议是确保在使用 Operator 之前仔细阅读文档。

编辑

看到代码更新后,还剩下一件事:

确保在任务中定义 python_callable 时不带括号,否则代码将被调用(如果您不知道它,这将非常不直观)。所以你的代码应该是这样的:

t1 = PythonOperator(
    task_id='DWH_Connect',
    python_callable=job_run,
    dag=dag)