我可以从云作曲家 DAG 中执行 python 个脚本吗?

Can I execute python scripts from within a cloud composer DAG?

(第一次使用 cloud composer)我见过的所有示例都在 DAG 中定义了非常简单的 python 函数。

我有多个冗长的 python 脚本想要 运行。我可以把它们放在任务中吗?

如果是这样,那么是使用 PythonOperator 还是从 BashOperator 调用它们更好?

例如像

default_dag-args ={}
with models.DAG('jobname', schedule_interval = datetime.timedelta(days=1), default_args = default_dag_args) as dag: 
do_stuff1 = python_operator.PythonOperator(
    task_id ='task_1'
    python_callable =myscript1.py)
do_stuff2 = python_operator.PythonOperator(
    task_id ='task_2'
    python_callable =myscript2.py)

如果将 python 脚本放入单独的文件中,实际上可以同时使用 PythonOperator 和 BashOperator 来执行脚本。

假设您将 python 脚本放在以下文件夹结构下。

dags/
    my_dag.py
    tasks/
         myscript1.py
         myscript2.py

my_dag.py

中使用PythonOperator
from datetime import timedelta

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from scripts import myscript1, myscript2

default_dag_args = {}

with DAG(
    "jobname",
    schedule_interval=timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    do_stuff1 = PythonOperator(
        task_id="task_1",
        python_callable=myscript1.main,  # assume entrypoint is main()
    )
    do_stuff2 = PythonOperator(
        task_id="task_2",
        python_callable=myscript2.main,  # assume entrypoint is main()
    )

my_dag.py

中使用BashOperator
from datetime import timedelta

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator

default_dag_args = {}

with DAG(
    "jobname",
    schedule_interval=timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    do_stuff1 = BashOperator(
        task_id="task_1",
        bash_command="python /path/to/myscript1.py",
    )
    do_stuff2 = BashOperator(
        task_id="task_2",
        bash_command="python /path/to/myscript2.py",
    )