Monitor/alert 对于 Dagster/Python 中的编排,如何记录跨度?
Monitor/alert for orchesration in Dagster/Python, how to record spans?
我正在编写一个在 Dagster 中编排的数据处理管道,我想添加 monitoring/alerting。
为了简化用例,我们处理几千条小数据,每条数据都可以由 4-5 个不同的主要工作流之一处理。我想跟踪每条数据得到完全处理所花费的时间,如果有任何数据花费 > 1 小时,我会发出警报。我想跟踪每个工作流处理的数据量,如果有任何数据与其正常值相差太大,我会发出警报。
我面临的挑战是 OpenTelemetry expects spans be identified with context managers:
with tracer.start_as_current_span("span-name") as span:
# do some work
但是,我的管道工作被分解为多个 Python 函数,Dagster 编排框架将它们联系在一起。在生产中,@op
s 将在单独的 Kubernetes 节点上 运行。 Here's an example:
@op(
out=DynamicOut(str),
)
def find_small_data_sets(context):
"""Starts 1 dataset going through the pipeline."""
datasets = db.list_some_things()
for dataset in datasets:
yield DynamicOutput(value=data)
@op
def process_data_part_one(data: str) -> str:
pass # Do some work on one of the data sets.
@op
def process_data_part_two(data: str) -> int:
# Do more work on a data set.
# conceptually would be part of the same span
# as process_data_part_one
@op
def workflow_done(outputs: List[int]) -> int:
# Finish up the workflow. Here is where a workflow-level
# span might end.
return sum(sizes)
@job
def do_full_orchestrated_job():
"""This function defines the DAG structure.
It does not perform the actual runtime execution of my job
when it gets called.
"""
datasets = find_small_data_sets()
processed_datasets = (
datasets
.map(process_data_part_one)
.map(process_data_part_two)
)
workflow_done(processed_datasets.collect())
鉴于我无权检测 Dagster 编排框架本身,有没有一种方法可以使用 OpenTelemetry 来监控我的管道?不同函数(没有上下文管理器)中的开始和结束跨度是否可能,特别是如果开始和结束实际上 运行ning 在不同的 CPU 上?或者对于这种 monitoring/alerting 有更好的工具吗?
感谢阅读!
在使用 span 时,您不必使用上下文管理器。您可以在不同的函数中开始一个跨度,并在另一个函数中结束它。下面是一些示例片段,你可以如何做到这一点。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
)
provider = TracerProvider()
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
# single span start in one func and end in another
def foo(span):
span.set_attribute("my_custom_attribue", "foo")
span.end()
def bar():
span = tracer.start_span("bar")
foo(span)
# end
# demonstrate `use_span(...)`
def bob(span):
with trace.use_span(span, end_on_exit=False):
# using the passed to wrap the internal span for trace detail
with tracer.start_as_current_span("internal") as internal:
internal.set_attribute("k", "v")
# `span`` is still not ended because `end_on_exit` is set to False
# You may pass around the span to different function before ending
# ending manually
span.end()
def alice():
span = tracer.start_span("alice")
bob(span)
# end
if __name__ == '__main__':
bar()
alice()
我最终使用自定义跟踪传播,这样 Dagster 作业就有一个共享跟踪,然后每个操作都有自己的跨度。所以我没有(反)序列化跨度,只是跟踪。参见 conceptual propagation docs and Python propagation docs。
Python 跟踪上下文传播的简单示例:
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
def get_trace_context():
carrier = {}
TraceContextTextMapPropagator().inject(carrier)
return carrier
def set_trace_context(carrier):
"""Sets the current trace context from the given propagated trace context carrier.
"""
ctx = TraceContextTextMapPropagator().extract(carrier)
context.attach(ctx)
特别是对于 Dagster,没有一种内置方式可以作为存储跟踪上下文的绝佳工具。每个操作执行上下文(例如具有默认多进程执行器的每个进程)的资源是 re-initialized,因此资源无法创建将为所有操作共享的跟踪上下文;你最终会得到不同的痕迹。所以我构建了一些自定义的东西,可以根据 运行 ID(或父 运行 ID)存储和获取跟踪上下文。这是遍历 运行 ID 祖先的片段:
def _get_run_id_and_ancestors(context):
"""Returns a list starting with the current run's ID and then any ancestor runs.
"""
run_id = context.run_id
run_ids = [run_id]
while True:
run = context.instance.get_run_by_id(run_id)
if run.parent_run_id:
run_id = run.parent_run_id
run_ids.append(run_id)
else:
break
return run_ids
我正在编写一个在 Dagster 中编排的数据处理管道,我想添加 monitoring/alerting。
为了简化用例,我们处理几千条小数据,每条数据都可以由 4-5 个不同的主要工作流之一处理。我想跟踪每条数据得到完全处理所花费的时间,如果有任何数据花费 > 1 小时,我会发出警报。我想跟踪每个工作流处理的数据量,如果有任何数据与其正常值相差太大,我会发出警报。
我面临的挑战是 OpenTelemetry expects spans be identified with context managers:
with tracer.start_as_current_span("span-name") as span:
# do some work
但是,我的管道工作被分解为多个 Python 函数,Dagster 编排框架将它们联系在一起。在生产中,@op
s 将在单独的 Kubernetes 节点上 运行。 Here's an example:
@op(
out=DynamicOut(str),
)
def find_small_data_sets(context):
"""Starts 1 dataset going through the pipeline."""
datasets = db.list_some_things()
for dataset in datasets:
yield DynamicOutput(value=data)
@op
def process_data_part_one(data: str) -> str:
pass # Do some work on one of the data sets.
@op
def process_data_part_two(data: str) -> int:
# Do more work on a data set.
# conceptually would be part of the same span
# as process_data_part_one
@op
def workflow_done(outputs: List[int]) -> int:
# Finish up the workflow. Here is where a workflow-level
# span might end.
return sum(sizes)
@job
def do_full_orchestrated_job():
"""This function defines the DAG structure.
It does not perform the actual runtime execution of my job
when it gets called.
"""
datasets = find_small_data_sets()
processed_datasets = (
datasets
.map(process_data_part_one)
.map(process_data_part_two)
)
workflow_done(processed_datasets.collect())
鉴于我无权检测 Dagster 编排框架本身,有没有一种方法可以使用 OpenTelemetry 来监控我的管道?不同函数(没有上下文管理器)中的开始和结束跨度是否可能,特别是如果开始和结束实际上 运行ning 在不同的 CPU 上?或者对于这种 monitoring/alerting 有更好的工具吗?
感谢阅读!
在使用 span 时,您不必使用上下文管理器。您可以在不同的函数中开始一个跨度,并在另一个函数中结束它。下面是一些示例片段,你可以如何做到这一点。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
)
provider = TracerProvider()
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
# single span start in one func and end in another
def foo(span):
span.set_attribute("my_custom_attribue", "foo")
span.end()
def bar():
span = tracer.start_span("bar")
foo(span)
# end
# demonstrate `use_span(...)`
def bob(span):
with trace.use_span(span, end_on_exit=False):
# using the passed to wrap the internal span for trace detail
with tracer.start_as_current_span("internal") as internal:
internal.set_attribute("k", "v")
# `span`` is still not ended because `end_on_exit` is set to False
# You may pass around the span to different function before ending
# ending manually
span.end()
def alice():
span = tracer.start_span("alice")
bob(span)
# end
if __name__ == '__main__':
bar()
alice()
我最终使用自定义跟踪传播,这样 Dagster 作业就有一个共享跟踪,然后每个操作都有自己的跨度。所以我没有(反)序列化跨度,只是跟踪。参见 conceptual propagation docs and Python propagation docs。
Python 跟踪上下文传播的简单示例:
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
def get_trace_context():
carrier = {}
TraceContextTextMapPropagator().inject(carrier)
return carrier
def set_trace_context(carrier):
"""Sets the current trace context from the given propagated trace context carrier.
"""
ctx = TraceContextTextMapPropagator().extract(carrier)
context.attach(ctx)
特别是对于 Dagster,没有一种内置方式可以作为存储跟踪上下文的绝佳工具。每个操作执行上下文(例如具有默认多进程执行器的每个进程)的资源是 re-initialized,因此资源无法创建将为所有操作共享的跟踪上下文;你最终会得到不同的痕迹。所以我构建了一些自定义的东西,可以根据 运行 ID(或父 运行 ID)存储和获取跟踪上下文。这是遍历 运行 ID 祖先的片段:
def _get_run_id_and_ancestors(context):
"""Returns a list starting with the current run's ID and then any ancestor runs.
"""
run_id = context.run_id
run_ids = [run_id]
while True:
run = context.instance.get_run_by_id(run_id)
if run.parent_run_id:
run_id = run.parent_run_id
run_ids.append(run_id)
else:
break
return run_ids