如何使用开放遥测跟踪两个异步 Go 例程
How to trace two asynchronous go routines with open telemetry
我正在尝试使用 Open Telemetry 跟踪具有两个 Go 例程的方法。第一个 Go 例程从 Kafka 读取并创建一个持久的作业(可能需要 1 秒到 1 分钟)。然后,第二个 Go 例程监听完成的作业。
进行跟踪的正确方法是什么,以便我们知道哪个作业结果(在第二个例程中)对应于哪个 kafka 消息(来自第一个例程)?
我的猜测是 Go 例程中创建的两个跨度必须通过相同的 traceId 链接。
func startListening(ctx context.Context) {
// initialise kafka client
go kafkaConsumeMessages(ctx)
go waitForJob(ctx)
}
func kafkaConsumeMessages(ctx) {
for message := range kafkaEvents {
// process message, create long job
// create span here with traceID?
}
func waitForJobs(ctx) {
for results := range finishedJobs
// process result
// create span here with traceID?
}
}
非常感谢任何建议!
答案实际上比我想象的要简单。您需要进一步传递附加到该长作业的跟踪信息,然后在处理完成的作业时对其进行解码。
在我的例子中,因为我使用的是文本 traceparent
header,因此 propagation.TraceContext{}
实现 propagation.TextMapPropagator
,我决定发送整个 traceparent
header(尽管我可能需要对 tracestate
做同样的事情)然后在处理完成的作业时使用 Extract
方法解码 header。但是为了使用Extract
方法,你需要实现propagation.TextMapCarrier
接口。
func startListening(ctx context.Context) {
// initialise kafka client
go kafkaConsumeMessages(ctx)
go waitForJob(ctx)
}
func kafkaConsumeMessages(ctx) {
for msg := range kafkaEvents {
// extract incoming tracing info from traceparent header. Example at https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/instrumentation/github.com/Shopify/sarama/otelsarama/example/consumer/consumer.go#L84
ctx := otel.GetTextMapPropagator().Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg))
// create span
tr := otel.Tracer("consumer")
_, span := tr.Start(ctx, "consume message", trace.WithAttributes(
semconv.MessagingOperationProcess,
))
defer span.End()
// get just the traceparent header
carrier := otelsarama.NewConsumerMessageCarrier(&msg)
traceparentHeader := carrier.Get("traceparent")
// process message, create long job and attach the header
jobs.enqueue{TraceparentHeader: traceparentHeader}
}
func waitForJobs(ctx) {
for result := range finishedJobs {
ctx = otel.GetTextMapPropagator().Extract(ctx, models.PseudoCarrier{S: result.TraceparentHeader})
ctx, span := tr.Start(ctx, "process result", trace.WithAttributes(
attribute.String("jobName", result.JobName),
))
defer span.End()
// do more work
}
}
// PseudoCarrier implements the propagation.TextMapCarrier interface so we can use the propagation.Extract method when parsing the traceparent header
type PseudoCarrier struct {
S string
}
func (c PseudoCarrier) Get(_ string) string {
return c.S
}
func (c PseudoCarrier) Set(string, string) {}
func (c PseudoCarrier) Keys() []string {
return []string{"traceparent"}
}
我正在尝试使用 Open Telemetry 跟踪具有两个 Go 例程的方法。第一个 Go 例程从 Kafka 读取并创建一个持久的作业(可能需要 1 秒到 1 分钟)。然后,第二个 Go 例程监听完成的作业。
进行跟踪的正确方法是什么,以便我们知道哪个作业结果(在第二个例程中)对应于哪个 kafka 消息(来自第一个例程)?
我的猜测是 Go 例程中创建的两个跨度必须通过相同的 traceId 链接。
func startListening(ctx context.Context) {
// initialise kafka client
go kafkaConsumeMessages(ctx)
go waitForJob(ctx)
}
func kafkaConsumeMessages(ctx) {
for message := range kafkaEvents {
// process message, create long job
// create span here with traceID?
}
func waitForJobs(ctx) {
for results := range finishedJobs
// process result
// create span here with traceID?
}
}
非常感谢任何建议!
答案实际上比我想象的要简单。您需要进一步传递附加到该长作业的跟踪信息,然后在处理完成的作业时对其进行解码。
在我的例子中,因为我使用的是文本 traceparent
header,因此 propagation.TraceContext{}
实现 propagation.TextMapPropagator
,我决定发送整个 traceparent
header(尽管我可能需要对 tracestate
做同样的事情)然后在处理完成的作业时使用 Extract
方法解码 header。但是为了使用Extract
方法,你需要实现propagation.TextMapCarrier
接口。
func startListening(ctx context.Context) {
// initialise kafka client
go kafkaConsumeMessages(ctx)
go waitForJob(ctx)
}
func kafkaConsumeMessages(ctx) {
for msg := range kafkaEvents {
// extract incoming tracing info from traceparent header. Example at https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/instrumentation/github.com/Shopify/sarama/otelsarama/example/consumer/consumer.go#L84
ctx := otel.GetTextMapPropagator().Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg))
// create span
tr := otel.Tracer("consumer")
_, span := tr.Start(ctx, "consume message", trace.WithAttributes(
semconv.MessagingOperationProcess,
))
defer span.End()
// get just the traceparent header
carrier := otelsarama.NewConsumerMessageCarrier(&msg)
traceparentHeader := carrier.Get("traceparent")
// process message, create long job and attach the header
jobs.enqueue{TraceparentHeader: traceparentHeader}
}
func waitForJobs(ctx) {
for result := range finishedJobs {
ctx = otel.GetTextMapPropagator().Extract(ctx, models.PseudoCarrier{S: result.TraceparentHeader})
ctx, span := tr.Start(ctx, "process result", trace.WithAttributes(
attribute.String("jobName", result.JobName),
))
defer span.End()
// do more work
}
}
// PseudoCarrier implements the propagation.TextMapCarrier interface so we can use the propagation.Extract method when parsing the traceparent header
type PseudoCarrier struct {
S string
}
func (c PseudoCarrier) Get(_ string) string {
return c.S
}
func (c PseudoCarrier) Set(string, string) {}
func (c PseudoCarrier) Keys() []string {
return []string{"traceparent"}
}