创建后有没有办法编辑气流操作员?
Is there any way to edit an airflow operator after creation?
我有一个 python 脚本,它基于映射每个所需选项的 JSON 文件动态创建任务(气流操作员)和 DAG。
该脚本还专门用于创建所需的任何操作员的功能。
有时我想根据映射激活一些条件选项...例如在 bigqueryOperator 中有时我需要一个 time_partitioning 和一个 destination_table,但我不想在每个映射任务上设置。
我试图阅读有关 BaseOperator 的文档,但我看不到任何 java 类似的设置方法。
return 运算符的函数,例如 bigQuery 一个
def bqOperator(mappedTask):
try:
return BigQueryOperator(
task_id=mappedTask.get('task_id'),
sql=mappedTask.get('sql'),
##destination_dataset_table=project+'.'+dataset+'.'+mappedTask.get('target'),
write_disposition=mappedTask.get('write_disposition'),
allow_large_results=mappedTask.get('allow_large_results'),
##time_partitioning=mappedTask.get('time_partitioning'),
use_legacy_sql=mappedTask.get('use_legacy_sql'),
dag=dag,
)
except Exception as e:
error = 'Error creating BigQueryOperator for task : ' + mappedTask.get('task_id')
logger.error(error)
raise Exception(error)
mappedTask inside json file without partitioning
{
"task_id": "TEST_TASK_ID",
"sql": "some fancy query",
"type": "bqOperator",
"dependencies": [],
"write_disposition": "WRITE_APPEND",
"allow_large_results": true,
"createDisposition": "CREATE_IF_NEEDED",
"use_legacy_sql": false
},
mappedTask inside json 带分区的文件
{
"task_id": "TEST_TASK_ID_PARTITION",
"sql": "some fancy query",
"type": "bqOperator",
"dependencies": [],
"write_disposition": "WRITE_APPEND",
"allow_large_results": true,
"createDisposition": "CREATE_IF_NEEDED",
"use_legacy_sql": false,
"targetTable": "TARGET_TABLE",
"time_partitioning": {
"field": "DATE_TO_PART",
"type": "DAY"
}
},
python中没有私有方法和字段,可以像
一样直接设置和获取字段
op.use_legacy_sql = True
鉴于我强烈反对这样做,因为这是真正的代码味道。相反,您可以修改工厂 class 以将一些默认值应用于 json 数据。
或者更好的是,对 json 本身应用默认值。比保存和使用更新 json。这将使事情更容易预测。
如下更改 bqOperator
来处理这种情况,基本上它会通过 None 当它在您的 json:
中找不到该字段时
def bqOperator(mappedTask):
try:
return BigQueryOperator(
task_id=mappedTask.get('task_id'),
sql=mappedTask.get('sql'),
destination_dataset_table="{}.{}.{}".format(project, dataset, mappedTask.get('target')) if mappedTask.get('target', None) else None,
write_disposition=mappedTask.get('write_disposition'),
allow_large_results=mappedTask.get('allow_large_results'),
time_partitioning=mappedTask.get('time_partitioning', None),
use_legacy_sql=mappedTask.get('use_legacy_sql'),
dag=dag,
)
except Exception as e:
error = 'Error creating BigQueryOperator for task : ' + mappedTask.get('task_id')
logger.error(error)
raise Exception(error)
我有一个 python 脚本,它基于映射每个所需选项的 JSON 文件动态创建任务(气流操作员)和 DAG。 该脚本还专门用于创建所需的任何操作员的功能。 有时我想根据映射激活一些条件选项...例如在 bigqueryOperator 中有时我需要一个 time_partitioning 和一个 destination_table,但我不想在每个映射任务上设置。
我试图阅读有关 BaseOperator 的文档,但我看不到任何 java 类似的设置方法。
return 运算符的函数,例如 bigQuery 一个
def bqOperator(mappedTask):
try:
return BigQueryOperator(
task_id=mappedTask.get('task_id'),
sql=mappedTask.get('sql'),
##destination_dataset_table=project+'.'+dataset+'.'+mappedTask.get('target'),
write_disposition=mappedTask.get('write_disposition'),
allow_large_results=mappedTask.get('allow_large_results'),
##time_partitioning=mappedTask.get('time_partitioning'),
use_legacy_sql=mappedTask.get('use_legacy_sql'),
dag=dag,
)
except Exception as e:
error = 'Error creating BigQueryOperator for task : ' + mappedTask.get('task_id')
logger.error(error)
raise Exception(error)
mappedTask inside json file without partitioning
{
"task_id": "TEST_TASK_ID",
"sql": "some fancy query",
"type": "bqOperator",
"dependencies": [],
"write_disposition": "WRITE_APPEND",
"allow_large_results": true,
"createDisposition": "CREATE_IF_NEEDED",
"use_legacy_sql": false
},
mappedTask inside json 带分区的文件
{
"task_id": "TEST_TASK_ID_PARTITION",
"sql": "some fancy query",
"type": "bqOperator",
"dependencies": [],
"write_disposition": "WRITE_APPEND",
"allow_large_results": true,
"createDisposition": "CREATE_IF_NEEDED",
"use_legacy_sql": false,
"targetTable": "TARGET_TABLE",
"time_partitioning": {
"field": "DATE_TO_PART",
"type": "DAY"
}
},
python中没有私有方法和字段,可以像
一样直接设置和获取字段op.use_legacy_sql = True
鉴于我强烈反对这样做,因为这是真正的代码味道。相反,您可以修改工厂 class 以将一些默认值应用于 json 数据。 或者更好的是,对 json 本身应用默认值。比保存和使用更新 json。这将使事情更容易预测。
如下更改 bqOperator
来处理这种情况,基本上它会通过 None 当它在您的 json:
def bqOperator(mappedTask):
try:
return BigQueryOperator(
task_id=mappedTask.get('task_id'),
sql=mappedTask.get('sql'),
destination_dataset_table="{}.{}.{}".format(project, dataset, mappedTask.get('target')) if mappedTask.get('target', None) else None,
write_disposition=mappedTask.get('write_disposition'),
allow_large_results=mappedTask.get('allow_large_results'),
time_partitioning=mappedTask.get('time_partitioning', None),
use_legacy_sql=mappedTask.get('use_legacy_sql'),
dag=dag,
)
except Exception as e:
error = 'Error creating BigQueryOperator for task : ' + mappedTask.get('task_id')
logger.error(error)
raise Exception(error)