如何使用 AirFlow 运行 一个包含 python 个文件的文件夹?
How to use AirFlow to run a folder of python files?
我在包含 python 个文件的文件夹中有一系列 Python 任务:file1.py、file2.py、...
我阅读了 Airflow 文档,但我不知道如何在 DAG 中指定 python 文件的文件夹和文件名?
我想执行那些 python 文件(不是通过 Python 运算符执行 Python 函数)。
任务 1:执行 file1.py(使用一些导入包)
任务 2:执行 file2.py(使用其他导入包)
这会很有帮助。谢谢,问候
您可以使用 BashOperator 将 python 个文件作为任务执行
from airflow import DAG
from airflow.operators import BashOperator,PythonOperator
from datetime import datetime, timedelta
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': seven_days_ago,
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
)
dag = DAG('simple', default_args=default_args)
t1 = BashOperator(
task_id='testairflow',
bash_command='python /home/airflow/airflow/dags/scripts/file1.py',
dag=dag)
要使用 BashOperator
作为一个整体执行 python 文件(如 liferacer 的回答):
from airflow.operators.bash_operator import BashOperator
bash_task = BashOperator(
task_id='bash_task',
bash_command='python file1.py',
dag=dag
)
然后,使用 PythonOperator
调用您的 main
函数。你应该已经有了一个 __main__
块,所以把里面发生的事情放到一个 main
函数中,这样你的 file1.py
看起来像这样:
def main():
"""This gets executed if `python file1` gets called."""
# my code
if __name__ == '__main__':
main()
那么你的dag定义:
from airflow.operators.python_operator import PythonOperator
import file1
python_task = PythonOperator(
task_id='python_task',
python_callable=file1.main,
dag=dag
)
我知道你在问 "would like to execute those python files (not the Python function through Python Operator)." 但我认为这可能比你更有效地使用 Airflow。我还看到以前写的答案很混乱,所以这是你想要的方式,也是我推荐的完成任务的方式:
假设:
dags/
my_dag_for_task_1_and_2.py
tasks/
file1.py
file2.py
您要求避免 PythonOperator
:
# my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import BashOperator
with DAG(
'my_dag_for_task_1_and_2',
default_args={
'owner': 'me',
'start_date': datetime(…),
…,
},
schedule_interval='8 * * * *',
) as dag:
task_1 = BashOperator(
task_id='task_1',
bash_command='/path/to/python /path/to/dags/tasks/file1.py',
)
task_2 = BashOperator(
task_id='task_2',
bash_command='/path/to/python /path/to/dags/tasks/file2.py',
)
task_1 >> task_2
您没有为 Airflow 从头编写 Python,而是 PythonOperator
:
# my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import PythonOperator
import tasks.file1
import tasks.file2
with DAG(
'my_dag_for_task_1_and_2',
default_args={
'owner': 'me',
'start_date': datetime(…),
…,
},
schedule_interval='8 * * * *',
) as dag:
task_1 = PythonOperator(
task_id='task_1',
python_callable=file1.function_in_file1,
)
task_2 = PythonOperator(
task_id='task_2',
python_callable=file2.function_in_file2, # maybe main?
)
task_1 >> task_2
我在包含 python 个文件的文件夹中有一系列 Python 任务:file1.py、file2.py、...
我阅读了 Airflow 文档,但我不知道如何在 DAG 中指定 python 文件的文件夹和文件名?
我想执行那些 python 文件(不是通过 Python 运算符执行 Python 函数)。
任务 1:执行 file1.py(使用一些导入包)
任务 2:执行 file2.py(使用其他导入包)
这会很有帮助。谢谢,问候
您可以使用 BashOperator 将 python 个文件作为任务执行
from airflow import DAG
from airflow.operators import BashOperator,PythonOperator
from datetime import datetime, timedelta
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': seven_days_ago,
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
)
dag = DAG('simple', default_args=default_args)
t1 = BashOperator(
task_id='testairflow',
bash_command='python /home/airflow/airflow/dags/scripts/file1.py',
dag=dag)
要使用 BashOperator
作为一个整体执行 python 文件(如 liferacer 的回答):
from airflow.operators.bash_operator import BashOperator
bash_task = BashOperator(
task_id='bash_task',
bash_command='python file1.py',
dag=dag
)
然后,使用 PythonOperator
调用您的 main
函数。你应该已经有了一个 __main__
块,所以把里面发生的事情放到一个 main
函数中,这样你的 file1.py
看起来像这样:
def main():
"""This gets executed if `python file1` gets called."""
# my code
if __name__ == '__main__':
main()
那么你的dag定义:
from airflow.operators.python_operator import PythonOperator
import file1
python_task = PythonOperator(
task_id='python_task',
python_callable=file1.main,
dag=dag
)
我知道你在问 "would like to execute those python files (not the Python function through Python Operator)." 但我认为这可能比你更有效地使用 Airflow。我还看到以前写的答案很混乱,所以这是你想要的方式,也是我推荐的完成任务的方式:
假设:
dags/
my_dag_for_task_1_and_2.py
tasks/
file1.py
file2.py
您要求避免 PythonOperator
:
# my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import BashOperator
with DAG(
'my_dag_for_task_1_and_2',
default_args={
'owner': 'me',
'start_date': datetime(…),
…,
},
schedule_interval='8 * * * *',
) as dag:
task_1 = BashOperator(
task_id='task_1',
bash_command='/path/to/python /path/to/dags/tasks/file1.py',
)
task_2 = BashOperator(
task_id='task_2',
bash_command='/path/to/python /path/to/dags/tasks/file2.py',
)
task_1 >> task_2
您没有为 Airflow 从头编写 Python,而是 PythonOperator
:
# my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import PythonOperator
import tasks.file1
import tasks.file2
with DAG(
'my_dag_for_task_1_and_2',
default_args={
'owner': 'me',
'start_date': datetime(…),
…,
},
schedule_interval='8 * * * *',
) as dag:
task_1 = PythonOperator(
task_id='task_1',
python_callable=file1.function_in_file1,
)
task_2 = PythonOperator(
task_id='task_2',
python_callable=file2.function_in_file2, # maybe main?
)
task_1 >> task_2