Dagster 故障通知系统

Dagster failure notification systems

dagster 有没有办法在发生某些事件(例如失败)时接收通知?例如,是否有与 sentry 等工具的集成可用?

有一个 datadog integration 允许用户将事件发送到 datadog。来自文档:

@solid(required_resource_keys={'datadog'})
def datadog_solid(context):
    dd = context.resources.datadog

    dd.event('Man down!', 'This server needs assistance.')
    dd.gauge('users.online', 1001, tags=["protocol:http"])
    dd.increment('page.views')
    dd.decrement('page.views')
    dd.histogram('album.photo.count', 26, tags=["gender:female"])
    dd.distribution('album.photo.count', 26, tags=["color:blue"])
    dd.set('visitors.uniques', 999, tags=["browser:ie"])
    dd.service_check('svc.check_name', dd.WARNING)
    dd.timing("query.response.time", 1234)

    # Use timed decorator
    @dd.timed('run_fn')
    def run_fn():
        pass

    run_fn()

@pipeline(mode_defs=[ModeDefinition(resource_defs={'datadog': datadog_resource})])
def dd_pipeline():
    datadog_solid()

result = execute_pipeline(
    dd_pipeline,
    {'resources': {'datadog': {'config': {'api_key': 'YOUR_KEY', 'app_key': 'YOUR_KEY'}}}},
)

为某些事件(即失败)添加第一个 class 用户可配置的挂钩目前正在进行中。

不确定在编写接受的答案时是否还不可用,但当前的 Dagster 版本 (0.9.16) 有更好的机制来解决手头的问题。

他们现在有一个 hook system,您可以在其中注释要在管道成功完成或失败时触发的函数。

文档中的代码示例:

@success_hook(required_resource_keys={'slack'})
def slack_on_success(context):
    message = 'solid {} succeeded'.format(context.solid.name)
    context.resources.slack.send_message(message)

@success_hook
def do_something_on_success(context):
    do_something()