如何在 Dataflow 上设置超时?
How can I set a timeout on Dataflow?
我正在使用 Composer 按计划运行 我的 Dataflow 管道。如果这项工作占用了一定时间,我希望它被杀死。有没有一种方法可以作为管道选项或 DAG 参数以编程方式执行此操作?
不确定如何将其作为管道配置选项,但这是一个想法。
您可以启动一个任务队列任务,并将倒计时设置为您的超时值。当任务启动时,您可以检查您的任务是否仍然 运行:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list
如果是,您可以使用作业状态对其调用更新 JOB_STATE_CANCELLED
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/update
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#jobstate
这是通过 googleapiclient
库完成的:https://developers.google.com/api-client-library/python/apis/discovery/v1
这是一个如何使用它的例子
class DataFlowJobsListHandler(InterimAdminResourceHandler):
def get(self, resource_id=None):
"""
Wrapper to this:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list
"""
if resource_id:
self.abort(405)
else:
credentials = GoogleCredentials.get_application_default()
service = discovery.build('dataflow', 'v1b3', credentials=credentials)
project_id = app_identity.get_application_id()
_filter = self.request.GET.pop('filter', 'UNKNOWN').upper()
jobs_list_request = service.projects().jobs().list(
projectId=project_id,
filter=_filter) #'ACTIVE'
jobs_list = jobs_list_request.execute()
return {
'$cursor': None,
'results': jobs_list.get('jobs', []),
}
我正在使用 Composer 按计划运行 我的 Dataflow 管道。如果这项工作占用了一定时间,我希望它被杀死。有没有一种方法可以作为管道选项或 DAG 参数以编程方式执行此操作?
不确定如何将其作为管道配置选项,但这是一个想法。
您可以启动一个任务队列任务,并将倒计时设置为您的超时值。当任务启动时,您可以检查您的任务是否仍然 运行:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list
如果是,您可以使用作业状态对其调用更新 JOB_STATE_CANCELLED
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/update
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#jobstate
这是通过 googleapiclient
库完成的:https://developers.google.com/api-client-library/python/apis/discovery/v1
这是一个如何使用它的例子
class DataFlowJobsListHandler(InterimAdminResourceHandler):
def get(self, resource_id=None):
"""
Wrapper to this:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list
"""
if resource_id:
self.abort(405)
else:
credentials = GoogleCredentials.get_application_default()
service = discovery.build('dataflow', 'v1b3', credentials=credentials)
project_id = app_identity.get_application_id()
_filter = self.request.GET.pop('filter', 'UNKNOWN').upper()
jobs_list_request = service.projects().jobs().list(
projectId=project_id,
filter=_filter) #'ACTIVE'
jobs_list = jobs_list_request.execute()
return {
'$cursor': None,
'results': jobs_list.get('jobs', []),
}