气流:调用任务组的问题
Airflow: Issues with Calling TaskGroup
我在调用 TaskGroups 时遇到问题,错误日志认为我的作业 ID 是 avg_speed_20220502_22c11bdf
而不是 avg_speed
,我不明白为什么。
这是我的代码:
with DAG(
'debug_bigquery_data_analytics',
catchup=False,
default_args=default_arguments) as dag:
# Note to self: the bucket region and the dataproc cluster should be in the same region
create_cluster = DataprocCreateClusterOperator(
task_id='create_cluster',
...
)
with TaskGroup(group_id='weekday_analytics') as weekday_analytics:
avg_temperature = DummyOperator(task_id='avg_temperature')
avg_tire_pressure = DummyOperator(task_id='avg_tire_pressure')
avg_speed = DataprocSubmitPySparkJobOperator(
task_id='avg_speed',
project_id='...',
main=f'gs://.../.../avg_speed.py',
cluster_name=f'spark-cluster-{{ ds_nodash }}',
region='...',
dataproc_jars=['gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar'],
)
avg_temperature >> avg_tire_pressure >> avg_speed
delete_cluster = DataprocDeleteClusterOperator(
task_id='delete_cluster',
project_id='...',
cluster_name='spark-cluster-{{ ds_nodash }}',
region='...',
trigger_rule='all_done',
)
create_cluster >> weekday_analytics >> delete_cluster
这是我收到的错误消息:
google.api_core.exceptions.InvalidArgument: 400 Job id 'weekday_analytics.avg_speed_20220502_22c11bdf' must conform to '[a-zA-Z0-9]([a-zA-Z0-9\-\_]{0,98}[a-zA-Z0-9])?' pattern
[2022-05-02, 11:46:11 UTC] {taskinstance.py:1278} INFO - Marking task as FAILED. dag_id=debug_bigquery_data_analytics, task_id=weekday_analytics.avg_speed, execution_date=20220502T184410, start_date=20220502T184610, end_date=20220502T184611
[2022-05-02, 11:46:11 UTC] {standard_task_runner.py:93} ERROR - Failed to execute job 549 for task weekday_analytics.avg_speed (400 Job id 'weekday_analytics.avg_speed_20220502_22c11bdf' must conform to '[a-zA-Z0-9]([a-zA-Z0-9\-\_]{0,98}[a-zA-Z0-9])?' pattern; 18116)
[2022-05-02, 11:46:11 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
[2022-05-02, 11:46:11 UTC] {local_task_job.py:264} INFO - 1 downstream tasks scheduled from follow-on schedule check
Airflow 任务标识符是 task_id
。但是,当使用 TaskGroups 时,您可以在不同的组中拥有相同的 task_id
,因此任务组中定义的任务具有 group_id.task_id
.
的标识符
对于apache-airflow-providers-google>7.0.0
:
bug 已修复。它现在应该可以工作了。
对于apache-airflow-providers-google<=7.0.0
:
您遇到问题是因为 DataprocJobBaseOperator
有:
:param job_name: The job name used in the DataProc cluster. This name by default
is the task_id appended with the execution data, but can be templated. The
name will always be appended with a random number to avoid name clashes.
问题是 Airflow 添加了 .
字符,而 Google 不接受它,因此要解决您的问题,您必须将 job_name
参数的默认值覆盖为字符串你的选择。如果您愿意,可以将其设置为 task_id
。
我打开https://github.com/apache/airflow/issues/23439报告了这个错误,同时你可以按照上面的建议去做。
我在调用 TaskGroups 时遇到问题,错误日志认为我的作业 ID 是 avg_speed_20220502_22c11bdf
而不是 avg_speed
,我不明白为什么。
这是我的代码:
with DAG(
'debug_bigquery_data_analytics',
catchup=False,
default_args=default_arguments) as dag:
# Note to self: the bucket region and the dataproc cluster should be in the same region
create_cluster = DataprocCreateClusterOperator(
task_id='create_cluster',
...
)
with TaskGroup(group_id='weekday_analytics') as weekday_analytics:
avg_temperature = DummyOperator(task_id='avg_temperature')
avg_tire_pressure = DummyOperator(task_id='avg_tire_pressure')
avg_speed = DataprocSubmitPySparkJobOperator(
task_id='avg_speed',
project_id='...',
main=f'gs://.../.../avg_speed.py',
cluster_name=f'spark-cluster-{{ ds_nodash }}',
region='...',
dataproc_jars=['gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar'],
)
avg_temperature >> avg_tire_pressure >> avg_speed
delete_cluster = DataprocDeleteClusterOperator(
task_id='delete_cluster',
project_id='...',
cluster_name='spark-cluster-{{ ds_nodash }}',
region='...',
trigger_rule='all_done',
)
create_cluster >> weekday_analytics >> delete_cluster
这是我收到的错误消息:
google.api_core.exceptions.InvalidArgument: 400 Job id 'weekday_analytics.avg_speed_20220502_22c11bdf' must conform to '[a-zA-Z0-9]([a-zA-Z0-9\-\_]{0,98}[a-zA-Z0-9])?' pattern
[2022-05-02, 11:46:11 UTC] {taskinstance.py:1278} INFO - Marking task as FAILED. dag_id=debug_bigquery_data_analytics, task_id=weekday_analytics.avg_speed, execution_date=20220502T184410, start_date=20220502T184610, end_date=20220502T184611
[2022-05-02, 11:46:11 UTC] {standard_task_runner.py:93} ERROR - Failed to execute job 549 for task weekday_analytics.avg_speed (400 Job id 'weekday_analytics.avg_speed_20220502_22c11bdf' must conform to '[a-zA-Z0-9]([a-zA-Z0-9\-\_]{0,98}[a-zA-Z0-9])?' pattern; 18116)
[2022-05-02, 11:46:11 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
[2022-05-02, 11:46:11 UTC] {local_task_job.py:264} INFO - 1 downstream tasks scheduled from follow-on schedule check
Airflow 任务标识符是 task_id
。但是,当使用 TaskGroups 时,您可以在不同的组中拥有相同的 task_id
,因此任务组中定义的任务具有 group_id.task_id
.
对于apache-airflow-providers-google>7.0.0
:
bug 已修复。它现在应该可以工作了。
对于apache-airflow-providers-google<=7.0.0
:
您遇到问题是因为 DataprocJobBaseOperator
有:
:param job_name: The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes.
问题是 Airflow 添加了 .
字符,而 Google 不接受它,因此要解决您的问题,您必须将 job_name
参数的默认值覆盖为字符串你的选择。如果您愿意,可以将其设置为 task_id
。
我打开https://github.com/apache/airflow/issues/23439报告了这个错误,同时你可以按照上面的建议去做。