Monitor/alert 对于 Dagster/Python 中的编排,如何记录跨度?

Monitor/alert for orchesration in Dagster/Python, how to record spans?

我正在编写一个在 Dagster 中编排的数据处理管道,我想添加 monitoring/alerting。

为了简化用例,我们处理几千条小数据,每条数据都可以由 4-5 个不同的主要工作流之一处理。我想跟踪每条数据得到完全处理所花费的时间,如果有任何数据花费 > 1 小时,我会发出警报。我想跟踪每个工作流处理的数据量,如果有任何数据与其正常值相差太大,我会发出警报。

我面临的挑战是 OpenTelemetry expects spans be identified with context managers:

with tracer.start_as_current_span("span-name") as span:
    # do some work

但是,我的管道工作被分解为多个 Python 函数,Dagster 编排框架将它们联系在一起。在生产中,@ops 将在单独的 Kubernetes 节点上 运行。 Here's an example:

@op(
   out=DynamicOut(str),
)
def find_small_data_sets(context):
    """Starts 1 dataset going through the pipeline."""
    datasets = db.list_some_things()
    for dataset in datasets:
        yield DynamicOutput(value=data)


@op
def process_data_part_one(data: str) -> str:
    pass # Do some work on one of the data sets.


@op
def process_data_part_two(data: str) -> int:
    # Do more work on a data set.
    # conceptually would be part of the same span
    # as process_data_part_one


@op
def workflow_done(outputs: List[int]) -> int:
    # Finish up the workflow. Here is where a workflow-level
    # span might end.
    return sum(sizes)


@job
def do_full_orchestrated_job():
    """This function defines the DAG structure.

    It does not perform the actual runtime execution of my job
    when it gets called.
    """
    datasets = find_small_data_sets()
    processed_datasets = (
        datasets
        .map(process_data_part_one)
        .map(process_data_part_two)
    )
    workflow_done(processed_datasets.collect())

鉴于我无权检测 Dagster 编排框架本身,有没有一种方法可以使用 OpenTelemetry 来监控我的管道?不同函数(没有上下文管理器)中的开始和结束跨度是否可能,特别是如果开始和结束实际上 运行ning 在不同的 CPU 上?或者对于这种 monitoring/alerting 有更好的工具吗?

感谢阅读!

在使用 span 时,您不必使用上下文管理器。您可以在不同的函数中开始一个跨度,并在另一个函数中结束它。下面是一些示例片段,你可以如何做到这一点。

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
    ConsoleSpanExporter,
)

provider = TracerProvider()
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)


tracer = trace.get_tracer(__name__)

# single span start in one func and end in another

def foo(span):
    span.set_attribute("my_custom_attribue", "foo")
    span.end()

def bar():
    span = tracer.start_span("bar")
    foo(span)

# end

# demonstrate `use_span(...)`

def bob(span):
    with trace.use_span(span, end_on_exit=False):
        # using the passed to wrap the internal span for trace detail
        with tracer.start_as_current_span("internal") as internal:
            internal.set_attribute("k", "v")

    # `span`` is still not ended because `end_on_exit` is set to False
    # You may pass around the span to different function before ending

    # ending manually
    span.end()

def alice():
    span = tracer.start_span("alice")
    bob(span)

# end

if __name__ == '__main__':
    bar()
    alice()

我最终使用自定义跟踪传播,这样 Dagster 作业就有一个共享跟踪,然后每个操作都有自己的跨度。所以我没有(反)序列化跨度,只是跟踪。参见 conceptual propagation docs and Python propagation docs

Python 跟踪上下文传播的简单示例:

from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator


def get_trace_context():
    carrier = {}
    TraceContextTextMapPropagator().inject(carrier)
    return carrier


def set_trace_context(carrier):
    """Sets the current trace context from the given propagated trace context carrier.    
    """
    ctx = TraceContextTextMapPropagator().extract(carrier)
    context.attach(ctx)

特别是对于 Dagster,没有一种内置方式可以作为存储跟踪上下文的绝佳工具。每个操作执行上下文(例如具有默认多进程执行器的每个进程)的资源是 re-initialized,因此资源无法创建将为所有操作共享的跟踪上下文;你最终会得到不同的痕迹。所以我构建了一些自定义的东西,可以根据 运行 ID(或父 运行 ID)存储和获取跟踪上下文。这是遍历 运行 ID 祖先的片段:

def _get_run_id_and_ancestors(context):
    """Returns a list starting with the current run's ID and then any ancestor runs.
    """
    run_id = context.run_id
    run_ids = [run_id]
    while True:
        run = context.instance.get_run_by_id(run_id)
        if run.parent_run_id:
            run_id = run.parent_run_id
            run_ids.append(run_id)
        else:
            break
    return run_ids