如何使用开放遥测跟踪两个异步 Go 例程

How to trace two asynchronous go routines with open telemetry

我正在尝试使用 Open Telemetry 跟踪具有两个 Go 例程的方法。第一个 Go 例程从 Kafka 读取并创建一个持久的作业(可能需要 1 秒到 1 分钟)。然后,第二个 Go 例程监听完成的作业。

进行跟踪的正确方法是什么,以便我们知道哪个作业结果(在第二个例程中)对应于哪个 kafka 消息(来自第一个例程)?

我的猜测是 Go 例程中创建的两个跨度必须通过相同的 traceId 链接。

func startListening(ctx context.Context) {
  // initialise kafka client

  go kafkaConsumeMessages(ctx)
  go waitForJob(ctx)
}

func kafkaConsumeMessages(ctx) {
  for message := range kafkaEvents {
    // process message, create long job
    // create span here with traceID?
  }  

func waitForJobs(ctx) {
  for results := range finishedJobs
    // process result
    // create span here with traceID?
  }
}

非常感谢任何建议!

答案实际上比我想象的要简单。您需要进一步传递附加到该长作业的跟踪信息,然后在处理完成的作业时对其进行解码。

在我的例子中,因为我使用的是文本 traceparent header,因此 propagation.TraceContext{} 实现 propagation.TextMapPropagator,我决定发送整个 traceparent header(尽管我可能需要对 tracestate 做同样的事情)然后在处理完成的作业时使用 Extract 方法解码 header。但是为了使用Extract方法,你需要实现propagation.TextMapCarrier接口。

func startListening(ctx context.Context) {
  // initialise kafka client

  go kafkaConsumeMessages(ctx)
  go waitForJob(ctx)
}

func kafkaConsumeMessages(ctx) {
  for msg := range kafkaEvents {
    // extract incoming tracing info from traceparent header. Example at https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/instrumentation/github.com/Shopify/sarama/otelsarama/example/consumer/consumer.go#L84
    ctx := otel.GetTextMapPropagator().Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg))

    // create span 
    tr := otel.Tracer("consumer")
    _, span := tr.Start(ctx, "consume message", trace.WithAttributes(
        semconv.MessagingOperationProcess,
    ))
    defer span.End()
   
    // get just the traceparent header
    carrier := otelsarama.NewConsumerMessageCarrier(&msg)
    traceparentHeader := carrier.Get("traceparent")

    // process message, create long job and attach the header
    jobs.enqueue{TraceparentHeader: traceparentHeader}
  }  

func waitForJobs(ctx) {
  for result := range finishedJobs {
    ctx = otel.GetTextMapPropagator().Extract(ctx, models.PseudoCarrier{S: result.TraceparentHeader})
    ctx, span := tr.Start(ctx, "process result", trace.WithAttributes(
        attribute.String("jobName", result.JobName),
    ))
    defer span.End()
 
    // do more work 
  }
}
// PseudoCarrier implements the propagation.TextMapCarrier interface so we can use the propagation.Extract method when parsing the traceparent header
type PseudoCarrier struct {
    S string
}

func (c PseudoCarrier) Get(_ string) string {
    return c.S
}

func (c PseudoCarrier) Set(string, string) {}

func (c PseudoCarrier) Keys() []string {
    return []string{"traceparent"}
}