我可以使用 op_args 或 op_kwargs 将变量值从 PythonOperator in Airflow 传递到 Python 脚本吗?

Can I pass variable values to Python script from PythonOperator in Airflow using op_args or op_kwargs?

我可以从 airflow DAG 中使用 op_argsop_kwargs 将变量值作为参数传递给 python 脚本。在我的气流中,Dag 将我的脚本导入为 from scripts import my_script 我的 python 运算符看起来像这样

PythonOperator(
    task_id='xxxxxx',
    python_callable=my_script.main,
    op_args=[bucket_name, prefix, source_blob_name, dest_bucket_name],
    dag=dag,
    trigger_rule='all_success'
)

我在 Airflow 中声明了我的变量。我可以在这里调用我的值 bucket_name=Variable.get('bucket_name') 我想将 bucket_name 的值传递给我 Python 脚本中的变量是可能的那么?

您可以像这样在 PythonOperator 中使用参数:

PythonOperator(
...
params={"bucket_name": bucket_name, "key": value}
...
)

并检索它:

def main(**kwargs):
 bucket_name = kwargs.get("bucket_name")
...

在Python运算符中op_argsop_kwargstemplates_dict都是templated fields.

所以你可以这样做:

PythonOperator(
    ...,
    op_args=['{{ var.value.bucket_name }}'],
    python_callable=my_script.main
)

那么您的 Python 可调用对象将是:

def main(*op_args):
    bucket_name = op_args[0]

你也可以使用op_kwargs / templates_dict:

PythonOperator(
    ...,
    templates_dict={'bucket_name', '{{ var.value.bucket_name }}'},
    python_callable=my_script.main
)

那么您的 Python 可调用对象将是:

def main(bucket_name, **context):
    ...

但是没有必要做任何一个。 没有理由在 Python 可调用对象中传递可以直接访问的参数。

你可以这样做:

from airflow.models.variable import Variable
def main(**context):
    bucket_name = Variable.get('bucket_name')

这是绝对安全的,因为 main 仅在执行 PythonOperator 时被调用。