Kubeflow 管道终止通知
Kubeflow Pipeline Termination Notificaiton
我尝试添加一个逻辑,当管道因某些错误而终止时,该逻辑将发送松弛通知。我试图用 ExitHandler
来实现它。但是,似乎 ExitHandler
不能依赖于任何操作。你有什么好主意吗?
我找到了一个使用 ExitHandler
的解决方案。我post下面是我的代码,希望它能帮助别人。
def slack_notification(slack_channel: str, status: str, name: str, is_exit_handler: bool = False):
"""
performs slack notifications
"""
send_slack_op = dsl.ContainerOp(
name=name,
image='wenmin.wu/slack-cli:latest',
is_exit_handler=is_exit_handler,
command=['sh', '-c'],
arguments=["/send-message.sh -d {} '{}'".format(slack_channel, status)]
)
send_slack_op.add_env_variable(V1EnvVar(name = 'SLACK_CLI_TOKEN', value_from=V1EnvVarSource(config_map_key_ref=V1ConfigMapKeySelector(name='workspace-config', key='SLACK_CLI_TOKEN'))))
return send_slack_op
@dsl.pipeline(
name='forecasting-supply',
description='forecasting supply ...'
)
def ml_pipeline(
param1,
param2,
param3,
):
exit_task = slack_notification(
slack_channel = slack_channel,
name = "supply-forecasting",
status = "Kubeflow pipeline: {{workflow.name}} has {{workflow.status}}!",
is_exit_handler = True
)
with dsl.ExitHandler(exit_task):
# put other tasks here
我尝试添加一个逻辑,当管道因某些错误而终止时,该逻辑将发送松弛通知。我试图用 ExitHandler
来实现它。但是,似乎 ExitHandler
不能依赖于任何操作。你有什么好主意吗?
我找到了一个使用 ExitHandler
的解决方案。我post下面是我的代码,希望它能帮助别人。
def slack_notification(slack_channel: str, status: str, name: str, is_exit_handler: bool = False):
"""
performs slack notifications
"""
send_slack_op = dsl.ContainerOp(
name=name,
image='wenmin.wu/slack-cli:latest',
is_exit_handler=is_exit_handler,
command=['sh', '-c'],
arguments=["/send-message.sh -d {} '{}'".format(slack_channel, status)]
)
send_slack_op.add_env_variable(V1EnvVar(name = 'SLACK_CLI_TOKEN', value_from=V1EnvVarSource(config_map_key_ref=V1ConfigMapKeySelector(name='workspace-config', key='SLACK_CLI_TOKEN'))))
return send_slack_op
@dsl.pipeline(
name='forecasting-supply',
description='forecasting supply ...'
)
def ml_pipeline(
param1,
param2,
param3,
):
exit_task = slack_notification(
slack_channel = slack_channel,
name = "supply-forecasting",
status = "Kubeflow pipeline: {{workflow.name}} has {{workflow.status}}!",
is_exit_handler = True
)
with dsl.ExitHandler(exit_task):
# put other tasks here