气流:调用任务组的问题

Airflow: Issues with Calling TaskGroup

我在调用 TaskGroups 时遇到问题,错误日志认为我的作业 ID 是 avg_speed_20220502_22c11bdf 而不是 avg_speed,我不明白为什么。

这是我的代码:

with DAG(
        'debug_bigquery_data_analytics',
         catchup=False,
         default_args=default_arguments) as dag:

    # Note to self: the bucket region and the dataproc cluster should be in the same region
    create_cluster = DataprocCreateClusterOperator(
        task_id='create_cluster',
        ...
    )

    with TaskGroup(group_id='weekday_analytics') as weekday_analytics:
        avg_temperature = DummyOperator(task_id='avg_temperature')

        avg_tire_pressure = DummyOperator(task_id='avg_tire_pressure')

        avg_speed = DataprocSubmitPySparkJobOperator(
            task_id='avg_speed',
            project_id='...',
            main=f'gs://.../.../avg_speed.py',
            cluster_name=f'spark-cluster-{{ ds_nodash }}',
            region='...',
            dataproc_jars=['gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar'],
        )

        avg_temperature >> avg_tire_pressure >> avg_speed

    delete_cluster = DataprocDeleteClusterOperator(
        task_id='delete_cluster',
        project_id='...',
        cluster_name='spark-cluster-{{ ds_nodash }}',
        region='...',
        trigger_rule='all_done',
    )

create_cluster >> weekday_analytics >> delete_cluster

这是我收到的错误消息:

google.api_core.exceptions.InvalidArgument: 400 Job id 'weekday_analytics.avg_speed_20220502_22c11bdf' must conform to '[a-zA-Z0-9]([a-zA-Z0-9\-\_]{0,98}[a-zA-Z0-9])?' pattern
[2022-05-02, 11:46:11 UTC] {taskinstance.py:1278} INFO - Marking task as FAILED. dag_id=debug_bigquery_data_analytics, task_id=weekday_analytics.avg_speed, execution_date=20220502T184410, start_date=20220502T184610, end_date=20220502T184611
[2022-05-02, 11:46:11 UTC] {standard_task_runner.py:93} ERROR - Failed to execute job 549 for task weekday_analytics.avg_speed (400 Job id 'weekday_analytics.avg_speed_20220502_22c11bdf' must conform to '[a-zA-Z0-9]([a-zA-Z0-9\-\_]{0,98}[a-zA-Z0-9])?' pattern; 18116)
[2022-05-02, 11:46:11 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
[2022-05-02, 11:46:11 UTC] {local_task_job.py:264} INFO - 1 downstream tasks scheduled from follow-on schedule check

Airflow 任务标识符是 task_id。但是,当使用 TaskGroups 时,您可以在不同的组中拥有相同的 task_id,因此任务组中定义的任务具有 group_id.task_id.

的标识符

对于apache-airflow-providers-google>7.0.0

bug 已修复。它现在应该可以工作了。

对于apache-airflow-providers-google<=7.0.0:

您遇到问题是因为 DataprocJobBaseOperator 有:

:param job_name: The job name used in the DataProc cluster. This name by default
    is the task_id appended with the execution data, but can be templated. The
    name will always be appended with a random number to avoid name clashes.

问题是 Airflow 添加了 . 字符,而 Google 不接受它,因此要解决您的问题,您必须将 job_name 参数的默认值覆盖为字符串你的选择。如果您愿意,可以将其设置为 task_id

我打开https://github.com/apache/airflow/issues/23439报告了这个错误,同时你可以按照上面的建议去做。