Airflow - 脚本在初始化期间执行

Airflow - Script executes during initialization

我有一个 Airflow 脚本,预计会将数据从 table_2 插入到 table_1。作为气流初始化过程的一部分,我看到插入函数在后台保持 运行 即使我没有触发它或安排它。我想知道使它自动触发的脚本有什么问题。我需要在下面的脚本中修改什么以确保它不会在初始化过程中执行命令。

## Library Imports
import psycopg2
import airflow
from airflow import DAG
from airflow.operators import BashOperator
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2018, 5, 25),
'email': ['admin@mail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

dag = DAG('sample', default_args=default_args)


#######################

def db_login():
    global db_con
try:
    db_con = psycopg2.connect(" dbname = 'db' user = 'user' password = 'pass' host = 'hostname' port = '5439' sslmode = 'require' ")
except:
    print("I am unable to connect to the database.")
print('Connection Task Complete: Connected to DB')
return(db_con)


#######################

def insert_data():
    cur = db_con.cursor()
    cur.execute("""insert into table_1 select id,name,status from table_2 limit 2 ;""")
    db_con.commit()
    print('ETL Task Complete: Inserting data into table_1')


db_login()
insert_data()
db_con.close()

##########################################


t1 = BashOperator(
task_id='db_con',
python_callable=db_login(),
bash_command='python3 ~/airflow/dags/sample.py',
email_on_failure=True,
email=['admin@mail.com'],
dag=dag)

t2 = BashOperator(
task_id='insert',
python_callable=insert_data(),
bash_command='python3 ~/airflow/dags/sample.py',
email_on_failure=True,
email=['admin@mail.com'],
dag=dag)


t1.set_downstream(t2)

任何人都可以提供帮助。谢谢

更新代码:

## Third party Library Imports

import psycopg2
import airflow
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io



default_args = {
'owner': 'admin',
#'depends_on_past': False,
'start_date': datetime(2018, 5, 25),
 'email': ['admin@mail.com'],
 'email_on_failure': True,
 'email_on_retry': True,
 'retries': 1,
 'retry_delay': timedelta(minutes=1), }

dag = DAG('sample', default_args=default_args, catchup=False, schedule_interval="@once")


def db_login():
    global db_con
    try:
        db_con = psycopg2.connect(
        " dbname = 'db' user = 'user' password = 'password' host = 'host' port = '5439' sslmode = 'require' ")
    except:
        print("I am unable to connect to the database.")
    print('Connection success')
    return (db_con)

def insert_data():
    cur = db_con.cursor()
    cur.execute("""insert into table_1 select id,name,status from table_2 limit 2;""")
    db_con.commit()
    print('ETL Task Complete: Inserting data into table_1')

def load_etl():
    db_login()
    insert_data()
    dwh_connection.close()

#Function to execute the query
load_etl()

t1 = BashOperator(
    task_id='db_connection',
    python_callable=load_etl(),
    bash_command='python3 ~/airflow/dags/sample.py',
    email_on_failure=True,
    email=['admin@mail.com'],
    dag=dag)

#t2 = BashOperator(
#task_id='ops_load_del',
#python_callable=insert_data(),
#bash_command='python3 ~/airflow/dags/sample.py',
#email_on_failure=True,
#email=['admin@mail.com'],
#dag=dag)

t1
#t1.set_downstream(t2)

如果您从 Python 式的角度来看您的 DAG,缩进会引发一些想法。

首先,尝试使用 python name-of-dag.py 执行 DAG。是的,不要使用 airflow 命令。 Airflow 的某些部分也正在这样做,以检查要做什么。

现在,如果正在执行某些代码,这可能与意图有关。

功能分析

这里的缩进好像坏了:

def db_login(): 全球 db_con 尝试: db_con = psycopg2.connect(" dbname = 'db' user = 'user' password = 'pass' host = 'hostname' port = '5439' sslmode = 'require'") 除了: 打印("I am unable to connect to the database.") 打印('Connection Task Complete: Connected to DB') return(db_con)

应该是:

def db_login():
    global db_con
    try:
        db_con = psycopg2.connect(" dbname = 'db' user = 'user' password = 'pass' host = 'hostname' port = '5439' sslmode = 'require' ")
    except:
        print("I am unable to connect to the database.")
    print('Connection Task Complete: Connected to DB')
    return(db_con)

否则一直执行最左边的代码。

此外:global 变量将不必在 Airflow 的其他方法中可用!要共享连接,请使用例如气流 XCOMhttps://airflow.apache.org/concepts.html#xcoms

直接在 DAG 中调用函数

此外,出于某种我不知道的原因,你想执行一些功能完全不受 Airflow 控制但每次执行时。

db_login() 
insert_data()
db_con.close()

每次调用 DAG 时 都会执行此代码(可能很多次)并且可能与您想要的计划完全不同。

如果您希望此代码用于测试目的,您可能希望将其放入主调用中:

if __name__ == '__main__':
    db_login() 
    insert_data()
    db_con.close()

即使您这样做 - 关闭操作仅在此工作流中可用,而在 DAG 中不可用。没有关闭连接的任务。

由于您使用的是 PythonOperator 构建一个小的 def 来执行此操作可能也是明智的,并且只有一个任务调用此 def:

def load_etl():
    db_login() 
    insert_data()
    db_con.close()

TL/DR: 消除所有缩进错误,以便如果您纯粹使用 Python 调用文件,则不会执行任何代码。

编辑

这也意味着没有函数调用在任务或 def 之外。这一行

#Function to execute the query
load_etl()

将被执行,因为它不是任务或定义的一部分。它必须被删除。那么它应该可以工作,因为函数调用是任务的一部分。

因为这个函数是一个Python函数,你应该使用一个PythonOperator和它的参数python_callable=load_etl(注意:行尾没有括号)