我可以从云作曲家 DAG 中执行 python 个脚本吗?
Can I execute python scripts from within a cloud composer DAG?
(第一次使用 cloud composer)我见过的所有示例都在 DAG 中定义了非常简单的 python 函数。
我有多个冗长的 python 脚本想要 运行。我可以把它们放在任务中吗?
如果是这样,那么是使用 PythonOperator 还是从 BashOperator 调用它们更好?
例如像
default_dag-args ={}
with models.DAG('jobname', schedule_interval = datetime.timedelta(days=1), default_args = default_dag_args) as dag:
do_stuff1 = python_operator.PythonOperator(
task_id ='task_1'
python_callable =myscript1.py)
do_stuff2 = python_operator.PythonOperator(
task_id ='task_2'
python_callable =myscript2.py)
如果将 python 脚本放入单独的文件中,实际上可以同时使用 PythonOperator 和 BashOperator 来执行脚本。
假设您将 python 脚本放在以下文件夹结构下。
dags/
my_dag.py
tasks/
myscript1.py
myscript2.py
在my_dag.py
中使用PythonOperator
from datetime import timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from scripts import myscript1, myscript2
default_dag_args = {}
with DAG(
"jobname",
schedule_interval=timedelta(days=1),
default_args=default_dag_args,
) as dag:
do_stuff1 = PythonOperator(
task_id="task_1",
python_callable=myscript1.main, # assume entrypoint is main()
)
do_stuff2 = PythonOperator(
task_id="task_2",
python_callable=myscript2.main, # assume entrypoint is main()
)
在my_dag.py
中使用BashOperator
from datetime import timedelta
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
default_dag_args = {}
with DAG(
"jobname",
schedule_interval=timedelta(days=1),
default_args=default_dag_args,
) as dag:
do_stuff1 = BashOperator(
task_id="task_1",
bash_command="python /path/to/myscript1.py",
)
do_stuff2 = BashOperator(
task_id="task_2",
bash_command="python /path/to/myscript2.py",
)
(第一次使用 cloud composer)我见过的所有示例都在 DAG 中定义了非常简单的 python 函数。
我有多个冗长的 python 脚本想要 运行。我可以把它们放在任务中吗?
如果是这样,那么是使用 PythonOperator 还是从 BashOperator 调用它们更好?
例如像
default_dag-args ={}
with models.DAG('jobname', schedule_interval = datetime.timedelta(days=1), default_args = default_dag_args) as dag:
do_stuff1 = python_operator.PythonOperator(
task_id ='task_1'
python_callable =myscript1.py)
do_stuff2 = python_operator.PythonOperator(
task_id ='task_2'
python_callable =myscript2.py)
如果将 python 脚本放入单独的文件中,实际上可以同时使用 PythonOperator 和 BashOperator 来执行脚本。
假设您将 python 脚本放在以下文件夹结构下。
dags/
my_dag.py
tasks/
myscript1.py
myscript2.py
在my_dag.py
PythonOperator
from datetime import timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from scripts import myscript1, myscript2
default_dag_args = {}
with DAG(
"jobname",
schedule_interval=timedelta(days=1),
default_args=default_dag_args,
) as dag:
do_stuff1 = PythonOperator(
task_id="task_1",
python_callable=myscript1.main, # assume entrypoint is main()
)
do_stuff2 = PythonOperator(
task_id="task_2",
python_callable=myscript2.main, # assume entrypoint is main()
)
在my_dag.py
BashOperator
from datetime import timedelta
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
default_dag_args = {}
with DAG(
"jobname",
schedule_interval=timedelta(days=1),
default_args=default_dag_args,
) as dag:
do_stuff1 = BashOperator(
task_id="task_1",
bash_command="python /path/to/myscript1.py",
)
do_stuff2 = BashOperator(
task_id="task_2",
bash_command="python /path/to/myscript2.py",
)