是否可以在 python 中获取开放遥测跟踪以记录 ThreadPoolExecutor 中发生的跨度?
Is it possible to get open telemetry tracing in python to record spans that happen within a ThreadPoolExecutor?
我打开了遥测 运行 一个 python FastApi 应用程序。跟踪正在发送到 Jaeger,我可以查看它们。
我有一堆 IO 密集型工作正在完成,所以我正在与 ThreadPoolExecutor 并行进行。在 ThreadPoolExecutor 执行的函数中创建的跨度没有进入 Jaeger。
任何人都可以为我指明正确的方向以使其正常工作吗?目前我正在使用禁用并发来记录性能调试的跟踪,但这在生产中不起作用。
我花了更多时间来深入研究这个问题并创建了这个 class 来解决这个问题:
from opentelemetry import context as otel_context
class TracedThreadPoolExecutor(ThreadPoolExecutor):
"""Implementation of :class:`ThreadPoolExecutor` that will pass context into sub tasks."""
def __init__(self, tracer: Tracer, *args, **kwargs):
self.tracer = tracer
super().__init__(*args, **kwargs)
def with_otel_context(self, context: otel_context.Context, fn: Callable):
otel_context.attach(context)
return fn()
def submit(self, fn, *args, **kwargs):
"""Submit a new task to the thread pool."""
# get the current otel context
context = otel_context.get_current()
if context:
return super().submit(
lambda: self.with_otel_context(
context, lambda: fn(*args, **kwargs)
),
)
else:
return super().submit(lambda: fn(*args, **kwargs))
这个class可以这样使用:
from opentelemetry import trace
import multiprocessing
tracer = trace.get_tracer(__name__)
executor = TracedThreadPoolExecutor(tracer, max_workers=multiprocessing.cpu_count())
# from here you can use it as you would a regular ThreadPoolExecutor
result = executor.submit(some_work)
executor.shutdown(wait=True)
# do something with the result
我打开了遥测 运行 一个 python FastApi 应用程序。跟踪正在发送到 Jaeger,我可以查看它们。
我有一堆 IO 密集型工作正在完成,所以我正在与 ThreadPoolExecutor 并行进行。在 ThreadPoolExecutor 执行的函数中创建的跨度没有进入 Jaeger。
任何人都可以为我指明正确的方向以使其正常工作吗?目前我正在使用禁用并发来记录性能调试的跟踪,但这在生产中不起作用。
我花了更多时间来深入研究这个问题并创建了这个 class 来解决这个问题:
from opentelemetry import context as otel_context
class TracedThreadPoolExecutor(ThreadPoolExecutor):
"""Implementation of :class:`ThreadPoolExecutor` that will pass context into sub tasks."""
def __init__(self, tracer: Tracer, *args, **kwargs):
self.tracer = tracer
super().__init__(*args, **kwargs)
def with_otel_context(self, context: otel_context.Context, fn: Callable):
otel_context.attach(context)
return fn()
def submit(self, fn, *args, **kwargs):
"""Submit a new task to the thread pool."""
# get the current otel context
context = otel_context.get_current()
if context:
return super().submit(
lambda: self.with_otel_context(
context, lambda: fn(*args, **kwargs)
),
)
else:
return super().submit(lambda: fn(*args, **kwargs))
这个class可以这样使用:
from opentelemetry import trace
import multiprocessing
tracer = trace.get_tracer(__name__)
executor = TracedThreadPoolExecutor(tracer, max_workers=multiprocessing.cpu_count())
# from here you can use it as you would a regular ThreadPoolExecutor
result = executor.submit(some_work)
executor.shutdown(wait=True)
# do something with the result