Airflow DAG 已成功执行但任务未执行 运行
Airflow DAG successfully executed but tasks didn't run
我在气流中有一个 DAG,其中有一个任务(python 操作员),我在 GUI 中强制执行 运行,它获得了成功状态。但是,任务没有执行,因此 DAG 什么也没做。 dag的代码如下:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks import MySqlHook
import pandas as pd
import datetime as dt
import json
from datetime import timedelta
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2019,8,29,18,0,0),
'concurrency':1,
'retries':3
}
def extraction_from_raw_data(conn_id):
mysqlserver = MySqlHook(conn_id)
query = """select * from antifraud.email_fraud_risk
WHERE ts >= DATE_ADD(CURDATE(), INTERVAL -3 DAY)"""
raw_data = mysqlserver.get_records(query)
raw_data = pd.DataFrame(raw_data)
data_as_list = []
for i in range(len(raw_data)):
dict1 = {}
dict1.update(json.loads(raw_data.at[i,'raw_content']))
data_as_list.append(dict1)
json_data_df = pd.DataFrame(data_as_list)
final_data = pd.concat([raw_data['email_id'],json_data_df],axis=1)
return final_data
with DAG('emailage_data',
default_args=default_args,
schedule_interval = timedelta(days=1)
) as dag:
extraction_from_raw_data = PythonOperator(
task_id = 'extraction_from_raw_data',
op_args = {'conn_id':'services'},
python_callable = extraction_from_raw_data)
extraction_from_raw_data
所有工作人员、调度程序和网络服务器都在正常工作,因为我 运行 正在 hello_world DAG(及其后续任务)成功。
一般来说,您应该始终在 DAG 的开始时间和当前时间之间留出额外的时间间隔。
Airflow 文档指出
Note that if you run a DAG on a schedule_interval
of one day, the run stamped 2016-01-01
will be trigger soon after 2016-01-01T23:59
. In other words, the job instance is started once the period it covers has ended.
Let’s Repeat That The scheduler runs your job one schedule_interval
AFTER the start date, at the END of the period.
我在气流中有一个 DAG,其中有一个任务(python 操作员),我在 GUI 中强制执行 运行,它获得了成功状态。但是,任务没有执行,因此 DAG 什么也没做。 dag的代码如下:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks import MySqlHook
import pandas as pd
import datetime as dt
import json
from datetime import timedelta
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2019,8,29,18,0,0),
'concurrency':1,
'retries':3
}
def extraction_from_raw_data(conn_id):
mysqlserver = MySqlHook(conn_id)
query = """select * from antifraud.email_fraud_risk
WHERE ts >= DATE_ADD(CURDATE(), INTERVAL -3 DAY)"""
raw_data = mysqlserver.get_records(query)
raw_data = pd.DataFrame(raw_data)
data_as_list = []
for i in range(len(raw_data)):
dict1 = {}
dict1.update(json.loads(raw_data.at[i,'raw_content']))
data_as_list.append(dict1)
json_data_df = pd.DataFrame(data_as_list)
final_data = pd.concat([raw_data['email_id'],json_data_df],axis=1)
return final_data
with DAG('emailage_data',
default_args=default_args,
schedule_interval = timedelta(days=1)
) as dag:
extraction_from_raw_data = PythonOperator(
task_id = 'extraction_from_raw_data',
op_args = {'conn_id':'services'},
python_callable = extraction_from_raw_data)
extraction_from_raw_data
所有工作人员、调度程序和网络服务器都在正常工作,因为我 运行 正在 hello_world DAG(及其后续任务)成功。
一般来说,您应该始终在 DAG 的开始时间和当前时间之间留出额外的时间间隔。
Airflow 文档指出
Note that if you run a DAG on a
schedule_interval
of one day, the run stamped2016-01-01
will be trigger soon after2016-01-01T23:59
. In other words, the job instance is started once the period it covers has ended.Let’s Repeat That The scheduler runs your job one
schedule_interval
AFTER the start date, at the END of the period.