Composer 没有看到数据流作业成功
Composer does not see dataflow job succeeded
我正在使用 Gcloud Composer 启动 Dataflow 作业。
我的 DAG 包含两个 Dataflow 作业,它们应该 运行 一个接一个。
import datetime
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow import models
default_dag_args = {
'start_date': datetime.datetime(2019, 10, 23),
'dataflow_default_options': {
'project': 'myproject',
'region': 'europe-west1',
'zone': 'europe-west1-c',
'tempLocation': 'gs://somebucket/',
}
}
with models.DAG(
'some_name',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
parameters = {'params': "param1"}
t1 = DataflowTemplateOperator(
task_id='dataflow_example_01',
template='gs://path/to/template/template_001',
parameters=parameters,
dag=dag)
parameters2 = {'params':"param2"}
t2 = DataflowTemplateOperator(
task_id='dataflow_example_02',
template='gs://path/to/templates/template_002',
parameters=parameters2,
dag=dag
)
t1 >> t2
当我签入数据流时,作业成功了,它应该制作的所有文件都已创建,但它看起来 运行 在美国地区,云作曲家环境在欧洲西部。
在气流中我可以看到第一个工作仍然是 运行ning 所以第二个工作没有启动
我应该向 DAG 添加什么才能使其成功?我如何 运行 在欧洲?
如有任何关于如何进行的建议或解决方案,我们将不胜感激。谢谢!
我过去不得不解决这个问题。在 Airflow 1.10.2 (or lower) the code calls to the service.projects().templates().launch()
endpoint. This was fixed in 1.10.3 中,使用区域性的代替:service.projects().locations().templates().launch()
.
截至 2019 年 10 月,可用于 Composer 环境的最新 Airflow 版本是 1.10.2。如果您立即需要解决方案,可以将修复程序反向移植到 Composer 中。
为此我们可以为我们自己的版本覆盖 DataflowTemplateOperator
RegionalDataflowTemplateOperator
:
class RegionalDataflowTemplateOperator(DataflowTemplateOperator):
def execute(self, context):
hook = RegionalDataFlowHook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
hook.start_template_dataflow(self.task_id, self.dataflow_default_options,
self.parameters, self.template)
现在将使用修改后的 RegionalDataFlowHook
,它会覆盖 DataFlowHook
运算符的 start_template_dataflow
方法来调用正确的端点:
class RegionalDataFlowHook(DataFlowHook):
def _start_template_dataflow(self, name, variables, parameters,
dataflow_template):
...
request = service.projects().locations().templates().launch(
projectId=variables['project'],
location=variables['region'],
gcsPath=dataflow_template,
body=body
)
...
return response
然后,我们可以使用我们的新运算符和 Google 提供的模板(用于测试目的)创建一个任务:
task = RegionalDataflowTemplateOperator(
task_id=JOB_NAME,
template=TEMPLATE_PATH,
parameters={
'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
'output': 'gs://{}/europe/output'.format(BUCKET)
},
dag=dag,
)
完整的 DAG here。对于更清洁的版本,可以将运算符移到单独的模块中。
我正在使用 Gcloud Composer 启动 Dataflow 作业。
我的 DAG 包含两个 Dataflow 作业,它们应该 运行 一个接一个。
import datetime
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow import models
default_dag_args = {
'start_date': datetime.datetime(2019, 10, 23),
'dataflow_default_options': {
'project': 'myproject',
'region': 'europe-west1',
'zone': 'europe-west1-c',
'tempLocation': 'gs://somebucket/',
}
}
with models.DAG(
'some_name',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
parameters = {'params': "param1"}
t1 = DataflowTemplateOperator(
task_id='dataflow_example_01',
template='gs://path/to/template/template_001',
parameters=parameters,
dag=dag)
parameters2 = {'params':"param2"}
t2 = DataflowTemplateOperator(
task_id='dataflow_example_02',
template='gs://path/to/templates/template_002',
parameters=parameters2,
dag=dag
)
t1 >> t2
当我签入数据流时,作业成功了,它应该制作的所有文件都已创建,但它看起来 运行 在美国地区,云作曲家环境在欧洲西部。
在气流中我可以看到第一个工作仍然是 运行ning 所以第二个工作没有启动
我应该向 DAG 添加什么才能使其成功?我如何 运行 在欧洲?
如有任何关于如何进行的建议或解决方案,我们将不胜感激。谢谢!
我过去不得不解决这个问题。在 Airflow 1.10.2 (or lower) the code calls to the service.projects().templates().launch()
endpoint. This was fixed in 1.10.3 中,使用区域性的代替:service.projects().locations().templates().launch()
.
截至 2019 年 10 月,可用于 Composer 环境的最新 Airflow 版本是 1.10.2。如果您立即需要解决方案,可以将修复程序反向移植到 Composer 中。
为此我们可以为我们自己的版本覆盖 DataflowTemplateOperator
RegionalDataflowTemplateOperator
:
class RegionalDataflowTemplateOperator(DataflowTemplateOperator):
def execute(self, context):
hook = RegionalDataFlowHook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
hook.start_template_dataflow(self.task_id, self.dataflow_default_options,
self.parameters, self.template)
现在将使用修改后的 RegionalDataFlowHook
,它会覆盖 DataFlowHook
运算符的 start_template_dataflow
方法来调用正确的端点:
class RegionalDataFlowHook(DataFlowHook):
def _start_template_dataflow(self, name, variables, parameters,
dataflow_template):
...
request = service.projects().locations().templates().launch(
projectId=variables['project'],
location=variables['region'],
gcsPath=dataflow_template,
body=body
)
...
return response
然后,我们可以使用我们的新运算符和 Google 提供的模板(用于测试目的)创建一个任务:
task = RegionalDataflowTemplateOperator(
task_id=JOB_NAME,
template=TEMPLATE_PATH,
parameters={
'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
'output': 'gs://{}/europe/output'.format(BUCKET)
},
dag=dag,
)
完整的 DAG here。对于更清洁的版本,可以将运算符移到单独的模块中。