Airflow 从另一个文件导入任务依赖
Airflow import task dependency from another file
我的 Airflow 有 4 个任务。 t1、t2、t3、t4,任务 ID 为 task_id1、task_id2、task_id3、task_id4
但是任务的执行顺序:t1 >> t2 >> t3 >> t4
将从文本文件中读取。
Example: the text file will have:
t1
t2
t3
t4
or
t1
t2
t4
t3
So the order will be: t1 >> t2 >> t3 >> t4 or t1 >> t2 >> t4 >> t3
But if i try:
f= open("file_name.txt")
lines = f.readlines()
task1 = lines[0].rstrip()
task2 = lines[1].rstrip()
task3 = lines[2].rstrip()
task4 = lines[3].rstrip()
and then use:
task1 >> task2 >> task3 >> task4
I get error:
×Broken DAG: unsupported operand type(s) for >>: 'str' and 'str
有什么建议...?
实际代码:
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.helpers import chain
from airflow.utils.dates import days_ago
from airflow import AirflowException
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
}
# Create DAG instance
dag = DAG(
'test',
default_args=default_args,
description='A simple tutorial DAG',
catchup=False,
schedule_interval=timedelta(seconds=20),
)
# First task
t1 = BashOperator(
task_id='task1',
bash_command='echo "task 1"',
dag=dag,
)
# Second task
t2 = BashOperator(
task_id='task2',
bash_command='echo "task2"',
dag=dag,
)
# same for t3 and t4
f= open("file_name.txt")
lines = f.readlines()
task1 = lines[0].rstrip()
task2 = lines[1].rstrip()
task3 = lines[2].rstrip()
task4 = lines[3].rstrip()
f.close()
task1 >> task2 >> task4 >> task3
file_name.txt 可以有:
t1
t2
t4
t3
我想要这个订单,但是这个订单必须来自外部文件,而不是在同一个文件中。
This thing helped:
def str_to_class(classname):
return getattr(sys.modules[__name__], classname)
task1 = ''
task2 = ''
task3 = ''
task4 = ''
f= open("(path_to_scheduler_list.txt")
lines = f.readlines()
task1 = lines[0].rstrip()
task2 = lines[1].rstrip()
task3 = lines[2].rstrip()
task4 = lines[3].rstrip()
f.close()
task1 = str_to_class(task1)
task2 = str_to_class(task2)
task3 = str_to_class(task3)
task4 = str_to_class(task4)
task1 >> task2 >> task3 >> task4
我的 Airflow 有 4 个任务。 t1、t2、t3、t4,任务 ID 为 task_id1、task_id2、task_id3、task_id4
但是任务的执行顺序:t1 >> t2 >> t3 >> t4 将从文本文件中读取。
Example: the text file will have:
t1
t2
t3
t4
or
t1
t2
t4
t3
So the order will be: t1 >> t2 >> t3 >> t4 or t1 >> t2 >> t4 >> t3
But if i try:
f= open("file_name.txt")
lines = f.readlines()
task1 = lines[0].rstrip()
task2 = lines[1].rstrip()
task3 = lines[2].rstrip()
task4 = lines[3].rstrip()
and then use:
task1 >> task2 >> task3 >> task4
I get error:
×Broken DAG: unsupported operand type(s) for >>: 'str' and 'str
有什么建议...?
实际代码:
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.helpers import chain
from airflow.utils.dates import days_ago
from airflow import AirflowException
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
}
# Create DAG instance
dag = DAG(
'test',
default_args=default_args,
description='A simple tutorial DAG',
catchup=False,
schedule_interval=timedelta(seconds=20),
)
# First task
t1 = BashOperator(
task_id='task1',
bash_command='echo "task 1"',
dag=dag,
)
# Second task
t2 = BashOperator(
task_id='task2',
bash_command='echo "task2"',
dag=dag,
)
# same for t3 and t4
f= open("file_name.txt")
lines = f.readlines()
task1 = lines[0].rstrip()
task2 = lines[1].rstrip()
task3 = lines[2].rstrip()
task4 = lines[3].rstrip()
f.close()
task1 >> task2 >> task4 >> task3
file_name.txt 可以有: t1 t2 t4 t3
我想要这个订单,但是这个订单必须来自外部文件,而不是在同一个文件中。
This thing helped:
def str_to_class(classname):
return getattr(sys.modules[__name__], classname)
task1 = ''
task2 = ''
task3 = ''
task4 = ''
f= open("(path_to_scheduler_list.txt")
lines = f.readlines()
task1 = lines[0].rstrip()
task2 = lines[1].rstrip()
task3 = lines[2].rstrip()
task4 = lines[3].rstrip()
f.close()
task1 = str_to_class(task1)
task2 = str_to_class(task2)
task3 = str_to_class(task3)
task4 = str_to_class(task4)
task1 >> task2 >> task3 >> task4