是否可以在 python 中获取开放遥测跟踪以记录 ThreadPoolExecutor 中发生的跨度?

Is it possible to get open telemetry tracing in python to record spans that happen within a ThreadPoolExecutor?

我打开了遥测 运行 一个 python FastApi 应用程序。跟踪正在发送到 Jaeger,我可以查看它们。

我有一堆 IO 密集型工作正在完成,所以我正在与 ThreadPoolExecutor 并行进行。在 ThreadPoolExecutor 执行的函数中创建的跨度没有进入 Jaeger。

任何人都可以为我指明正确的方向以使其正常工作吗?目前我正在使用禁用并发来记录性能调试的跟踪,但这在生产中不起作用。

我花了更多时间来深入研究这个问题并创建了这个 class 来解决这个问题:

from opentelemetry import context as otel_context

class TracedThreadPoolExecutor(ThreadPoolExecutor):
    """Implementation of :class:`ThreadPoolExecutor` that will pass context into sub tasks."""

    def __init__(self, tracer: Tracer, *args, **kwargs):
        self.tracer = tracer
        super().__init__(*args, **kwargs)

    def with_otel_context(self, context: otel_context.Context, fn: Callable):
        otel_context.attach(context)
        return fn()

    def submit(self, fn, *args, **kwargs):
        """Submit a new task to the thread pool."""

        # get the current otel context
        context = otel_context.get_current()
        if context:
            return super().submit(
                lambda: self.with_otel_context(
                    context, lambda: fn(*args, **kwargs)
                ),
            )
        else:
            return super().submit(lambda: fn(*args, **kwargs))

这个class可以这样使用:

from opentelemetry import trace
import multiprocessing

tracer = trace.get_tracer(__name__)
executor = TracedThreadPoolExecutor(tracer, max_workers=multiprocessing.cpu_count())

# from here you can use it as you would a regular ThreadPoolExecutor

result = executor.submit(some_work)
executor.shutdown(wait=True)
# do something with the result