我可以使用 op_args 或 op_kwargs 将变量值从 PythonOperator in Airflow 传递到 Python 脚本吗?
Can I pass variable values to Python script from PythonOperator in Airflow using op_args or op_kwargs?
我可以从 airflow DAG 中使用 op_args
或 op_kwargs
将变量值作为参数传递给 python 脚本。在我的气流中,Dag 将我的脚本导入为 from scripts import my_script
我的 python 运算符看起来像这样
PythonOperator(
task_id='xxxxxx',
python_callable=my_script.main,
op_args=[bucket_name, prefix, source_blob_name, dest_bucket_name],
dag=dag,
trigger_rule='all_success'
)
我在 Airflow 中声明了我的变量。我可以在这里调用我的值 bucket_name=Variable.get('bucket_name')
我想将 bucket_name 的值传递给我 Python 脚本中的变量是可能的那么?
您可以像这样在 PythonOperator 中使用参数:
PythonOperator(
...
params={"bucket_name": bucket_name, "key": value}
...
)
并检索它:
def main(**kwargs):
bucket_name = kwargs.get("bucket_name")
...
在Python运算符中op_args
、op_kwargs
、templates_dict
都是templated fields.
所以你可以这样做:
PythonOperator(
...,
op_args=['{{ var.value.bucket_name }}'],
python_callable=my_script.main
)
那么您的 Python 可调用对象将是:
def main(*op_args):
bucket_name = op_args[0]
你也可以使用op_kwargs
/ templates_dict
:
PythonOperator(
...,
templates_dict={'bucket_name', '{{ var.value.bucket_name }}'},
python_callable=my_script.main
)
那么您的 Python 可调用对象将是:
def main(bucket_name, **context):
...
但是没有必要做任何一个。
没有理由在 Python 可调用对象中传递可以直接访问的参数。
你可以这样做:
from airflow.models.variable import Variable
def main(**context):
bucket_name = Variable.get('bucket_name')
这是绝对安全的,因为 main
仅在执行 PythonOperator
时被调用。
我可以从 airflow DAG 中使用 op_args
或 op_kwargs
将变量值作为参数传递给 python 脚本。在我的气流中,Dag 将我的脚本导入为 from scripts import my_script
我的 python 运算符看起来像这样
PythonOperator(
task_id='xxxxxx',
python_callable=my_script.main,
op_args=[bucket_name, prefix, source_blob_name, dest_bucket_name],
dag=dag,
trigger_rule='all_success'
)
我在 Airflow 中声明了我的变量。我可以在这里调用我的值 bucket_name=Variable.get('bucket_name')
我想将 bucket_name 的值传递给我 Python 脚本中的变量是可能的那么?
您可以像这样在 PythonOperator 中使用参数:
PythonOperator(
...
params={"bucket_name": bucket_name, "key": value}
...
)
并检索它:
def main(**kwargs):
bucket_name = kwargs.get("bucket_name")
...
在Python运算符中op_args
、op_kwargs
、templates_dict
都是templated fields.
所以你可以这样做:
PythonOperator(
...,
op_args=['{{ var.value.bucket_name }}'],
python_callable=my_script.main
)
那么您的 Python 可调用对象将是:
def main(*op_args):
bucket_name = op_args[0]
你也可以使用op_kwargs
/ templates_dict
:
PythonOperator(
...,
templates_dict={'bucket_name', '{{ var.value.bucket_name }}'},
python_callable=my_script.main
)
那么您的 Python 可调用对象将是:
def main(bucket_name, **context):
...
但是没有必要做任何一个。 没有理由在 Python 可调用对象中传递可以直接访问的参数。
你可以这样做:
from airflow.models.variable import Variable
def main(**context):
bucket_name = Variable.get('bucket_name')
这是绝对安全的,因为 main
仅在执行 PythonOperator
时被调用。