Composer 没有看到数据流作业成功

Composer does not see dataflow job succeeded

我正在使用 Gcloud Composer 启动 Dataflow 作业。

我的 DAG 包含两个 Dataflow 作业,它们应该 运行 一个接一个。

import datetime

from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow import models


default_dag_args = {

    'start_date': datetime.datetime(2019, 10, 23),
    'dataflow_default_options': {
               'project': 'myproject',
               'region': 'europe-west1',
               'zone': 'europe-west1-c',
               'tempLocation': 'gs://somebucket/',
               }
}

with models.DAG(
        'some_name',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    parameters = {'params': "param1"}

    t1 = DataflowTemplateOperator(
        task_id='dataflow_example_01',
        template='gs://path/to/template/template_001',
        parameters=parameters,
        dag=dag)

    parameters2 = {'params':"param2"}

    t2 = DataflowTemplateOperator(
        task_id='dataflow_example_02',
        template='gs://path/to/templates/template_002',
        parameters=parameters2,
        dag=dag
    )

    t1 >> t2

当我签入数据流时,作业成功了,它应该制作的所有文件都已创建,但它看起来 运行 在美国地区,云作曲家环境在欧洲西部。

在气流中我可以看到第一个工作仍然是 运行ning 所以第二个工作没有启动

我应该向 DAG 添加什么才能使其成功?我如何 运行 在欧洲?

如有任何关于如何进行的建议或解决方案,我们将不胜感激。谢谢!

我过去不得不解决这个问题。在 Airflow 1.10.2 (or lower) the code calls to the service.projects().templates().launch() endpoint. This was fixed in 1.10.3 中,使用区域性的代替:service.projects().locations().templates().launch().

截至 2019 年 10 月,可用于 Composer 环境的最新 Airflow 版本是 1.10.2。如果您立即需要解决方案,可以将修复程序反向移植到 Composer 中。

为此我们可以为我们自己的版本覆盖 DataflowTemplateOperator RegionalDataflowTemplateOperator:

class RegionalDataflowTemplateOperator(DataflowTemplateOperator):
  def execute(self, context):
    hook = RegionalDataFlowHook(gcp_conn_id=self.gcp_conn_id,
                        delegate_to=self.delegate_to,
                        poll_sleep=self.poll_sleep)

    hook.start_template_dataflow(self.task_id, self.dataflow_default_options,
                                 self.parameters, self.template)

现在将使用修改后的 RegionalDataFlowHook,它会覆盖 DataFlowHook 运算符的 start_template_dataflow 方法来调用正确的端点:

class RegionalDataFlowHook(DataFlowHook):
  def _start_template_dataflow(self, name, variables, parameters,
                               dataflow_template):
      ...
      request = service.projects().locations().templates().launch(
          projectId=variables['project'],
          location=variables['region'],
          gcsPath=dataflow_template,
          body=body
      )
      ...
      return response

然后,我们可以使用我们的新运算符和 Google 提供的模板(用于测试目的)创建一个任务:

task = RegionalDataflowTemplateOperator(
    task_id=JOB_NAME,
    template=TEMPLATE_PATH,
    parameters={
        'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
        'output': 'gs://{}/europe/output'.format(BUCKET)
    },
    dag=dag,
)

完整的 DAG here。对于更清洁的版本,可以将运算符移到单独的模块中。