如何传递动态参数 Airflow 运算符?
How to pass dynamic arguments Airflow operator?
我正在使用 Airflow 在 Google Cloud Composer 上 运行 Spark 作业。我需要
- 创建集群(YAML参数由用户提供)
- spark 作业列表(作业参数也由每个作业 YAML 提供)
使用 Airflow API - 我可以读取 YAML 文件,并使用 xcom 跨任务推送变量。
但是,考虑 DataprocClusterCreateOperator()
cluster_name
project_id
zone
和其他一些参数被标记为模板化。
如果我想传入其他参数作为模板(目前不是这样)怎么办? - 喜欢 image_version
,
num_workers
、worker_machine_type
等等?
有什么解决方法吗?
不确定'dynamic'是什么意思,但是当yaml文件更新时,如果读取文件进程在dag文件体中,dag将被刷新以从yaml文件申请新的args。所以实际上,您不需要 XCOM 来获取参数。
只需简单地创建一个参数字典然后传递给 default_args:
CONFIGFILE = os.path.join(
os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')
with open(CONFIGFILE, 'r') as ymlfile:
CFG = yaml.load(ymlfile)
default_args = {
'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
'project_id': CFG['section_A']['project_id'],
'zone': CFG['section_A']['zone'],
'mage_version': CFG['section_A']['image_version'],
'num_workers': CFG['section_A']['num_workers'],
'worker_machine_type': CFG['section_A']['worker_machine_type'],
# you can add all needs params here.
}
DAG = DAG(
dag_id=DAG_NAME,
schedule_interval=SCHEDULE_INTEVAL,
default_args=default_args, # pass the params to DAG environment
)
Task1 = DataprocClusterCreateOperator(
task_id='your_task_id',
dag=DAG
)
但是如果您想要动态 dag 而不是参数,您可能需要其他策略,例如 this。
所以你可能需要弄清楚基本思路:
动态在哪个层次?任务等级? DAG 级别?
或者您可以创建自己的 Operator 来完成这项工作并获取参数。
我正在使用 Airflow 在 Google Cloud Composer 上 运行 Spark 作业。我需要
- 创建集群(YAML参数由用户提供)
- spark 作业列表(作业参数也由每个作业 YAML 提供)
使用 Airflow API - 我可以读取 YAML 文件,并使用 xcom 跨任务推送变量。
但是,考虑 DataprocClusterCreateOperator()
cluster_name
project_id
zone
和其他一些参数被标记为模板化。
如果我想传入其他参数作为模板(目前不是这样)怎么办? - 喜欢 image_version
,
num_workers
、worker_machine_type
等等?
有什么解决方法吗?
不确定'dynamic'是什么意思,但是当yaml文件更新时,如果读取文件进程在dag文件体中,dag将被刷新以从yaml文件申请新的args。所以实际上,您不需要 XCOM 来获取参数。 只需简单地创建一个参数字典然后传递给 default_args:
CONFIGFILE = os.path.join(
os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')
with open(CONFIGFILE, 'r') as ymlfile:
CFG = yaml.load(ymlfile)
default_args = {
'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
'project_id': CFG['section_A']['project_id'],
'zone': CFG['section_A']['zone'],
'mage_version': CFG['section_A']['image_version'],
'num_workers': CFG['section_A']['num_workers'],
'worker_machine_type': CFG['section_A']['worker_machine_type'],
# you can add all needs params here.
}
DAG = DAG(
dag_id=DAG_NAME,
schedule_interval=SCHEDULE_INTEVAL,
default_args=default_args, # pass the params to DAG environment
)
Task1 = DataprocClusterCreateOperator(
task_id='your_task_id',
dag=DAG
)
但是如果您想要动态 dag 而不是参数,您可能需要其他策略,例如 this。
所以你可能需要弄清楚基本思路: 动态在哪个层次?任务等级? DAG 级别?
或者您可以创建自己的 Operator 来完成这项工作并获取参数。