Spring Cloud Sleuth 跟踪 ID 通过 Spring Cloud Stream Binder for AWS Kinesis 传播
Spring Cloud Sleuth trace ID propagation via Spring Cloud Stream Binder for AWS Kinesis
我正在努力确保 Spring Cloud Sleuth 跟踪 ID 根据我所读 here. If I'm reading the Spring Cloud Kinesis binder documentation correctly that is available here 以 header 的形式通过 Kinesis 发送,我认为配置,下面的生产者和消费者代码将允许 X-B3-TraceId header 成功地从生产者发送到消费者。但是,这目前不起作用,因为 header 在消费者端不可用。使用 AWS Kinesis Binder,如何将此 header 从生产者传递给消费者?下面缺少我的配置和代码是什么?
Spring 开机:2.6.7
Spring云:2021.0.1
Spring 适用于 AWS Kinesis 的 Cloud Stream Binder:2.2.0
配置:
spring:
cloud:
stream:
kinesis:
binder:
headers:
- X-B3-TraceId
制作人:
accountChannels
.accountRequest()
.send(
MessageBuilder.withPayload(accountEvent)
.setHeader("X-B3-TraceId", getTraceId())
.build());
...
消费者:
@StreamListener(ApplicationChannels.ACCOUNT_REQUEST)
public void processAccountRequest(
final Message<AccountEvent> message,
@Header(AwsHeaders.CHECKPOINTER) final IRecordProcessorCheckpointer checkpointer,
@Header("X-B3-TraceId") final String traceId) {
...
在消费者中记录 MessageHeaders 的结果:
Received message header b3 and value 3e6c8447beeff928-3e6c8447beeff928-0
Received message header nativeHeaders and value {b3=[3e6c8447beeff928-3e6c8447beeff928-0]}
Received message header x-b3-traceid and value 304f6cc831322a5c
Received message header aws_shard and value shardId-000000000000
Received message header aws_checkpointer and value com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer@338f0685
Received message header id and value 7754e5ff-e09d-886c-a425-8e0068a57a89
Received message header sourceData and value UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49625973706469884904499220492564923628215483380266958850, getData()=java.nio.HeapByteBuffer[pos=0 lim=171 cap=171], getPartitionKey()=8162231937]
Received message header contentType and value application/vnd.accountevent.v1+avro
Received message header aws_receivedPartitionKey and value 8162231937
Received message header aws_receivedStream and value account-request
Received message header aws_receivedSequenceNumber and value 49625973706469884904499220492564923628215483380266958850
Received message header timestamp and value 1650915186028
我有一个这样的 Spring 云应用程序:
application.properties
logging.level.root=warn
spring.sleuth.tracer.mode=brave
spring.cloud.stream.bindings.input.destination=test-stream
spring.cloud.stream.bindings.input.group=my-group
spring.cloud.stream.bindings.output.destination=test-stream
spring.cloud.stream.kinesis.binder.headers=X-B3-TraceId
So72003034Application
@SpringBootApplication
@EnableBinding({ Source.class, Sink.class })
public class So72003034Application {
public static void main(String[] args) {
SpringApplication.run(So72003034Application.class, args);
}
@StreamListener(Sink.INPUT)
public void processAccountRequest(
Message<String> message,
@Header("X-B3-TraceId") final String traceId) {
System.out.println("Consumer trace: " + traceId);
}
}
So72003034ApplicationTests
@SpringBootTest(properties = "cloud.aws.region.static=us-east-1")
@Testcontainers(disabledWithoutDocker = true)
@DirtiesContext
class So72003034ApplicationTests {
@Container
public static LocalStackContainer localStack =
new LocalStackContainer(DockerImageName.parse("localstack/localstack"))
.withServices(LocalStackContainer.Service.KINESIS);
@Autowired
Source source;
@Autowired
Tracer tracer;
@Test
void testTracingPropagation() throws InterruptedException {
String traceId = this.tracer.newTrace().context().traceIdString();
System.out.println("Producer trace: " + traceId);
this.source.output()
.send(MessageBuilder.withPayload("tet")
.setHeader("X-B3-TraceId", traceId)
.build());
Thread.sleep(10_000);
}
@TestConfiguration
public static class LocalStackConfiguration {
@Bean
public AmazonKinesisAsync amazonKinesis() {
return AmazonKinesisAsyncClientBuilder.standard()
.withEndpointConfiguration(localStack.getEndpointConfiguration(LocalStackContainer.Service.KINESIS))
.withCredentials(localStack.getDefaultCredentialsProvider())
.build();
}
@Bean
public LockRegistry lockRegistry() {
return new DefaultLockRegistry();
}
@Bean
public ConcurrentMetadataStore checkpointStore() {
return new SimpleMetadataStore();
}
}
}
我看到 X-B3-TraceId
header 通过 Kinesis 流从生产者传播到消费者。
我无法确认是否启用了 KCL,因为它的生产者库不再适用于 Windows。
我还在调试模式下看到接收到消息容器a b3
header。也许是 Spring Cloud Sleuth 自动填充的那个。
更新
工作策略是传播整个b3
header:
spring.cloud.stream.kinesis.binder.headers=b3
我的制作人是这样的,只是因为还没有行动的痕迹:
ScopedSpan scopedSpan = this.tracer.startScopedSpan("test");
String traceId = scopedSpan.context().traceId();
System.out.println("Producer trace: " + traceId);
this.source.output().send(new GenericMessage<>("test"));
scopedSpan.end();
这样 TracingChannelInterceptor
就可以从 ThreadLocal
中获取轨迹,开始一个新的跨度并将其设置为 b3
header.
在消费者方面我有:
@StreamListener(Sink.INPUT)
public void processAccountRequest(Message<String> message,
@Header("b3") final String trace) {
System.out.println("Consumer b3: " + trace);
}
这确实显示了与我发送给生产者相同的跟踪:
Producer trace: 957fd8316345f39f
Consumer b3: 957fd8316345f39f-44810ee5bf7cb0b7-0
我正在努力确保 Spring Cloud Sleuth 跟踪 ID 根据我所读 here. If I'm reading the Spring Cloud Kinesis binder documentation correctly that is available here 以 header 的形式通过 Kinesis 发送,我认为配置,下面的生产者和消费者代码将允许 X-B3-TraceId header 成功地从生产者发送到消费者。但是,这目前不起作用,因为 header 在消费者端不可用。使用 AWS Kinesis Binder,如何将此 header 从生产者传递给消费者?下面缺少我的配置和代码是什么?
Spring 开机:2.6.7
Spring云:2021.0.1
Spring 适用于 AWS Kinesis 的 Cloud Stream Binder:2.2.0
配置:
spring:
cloud:
stream:
kinesis:
binder:
headers:
- X-B3-TraceId
制作人:
accountChannels
.accountRequest()
.send(
MessageBuilder.withPayload(accountEvent)
.setHeader("X-B3-TraceId", getTraceId())
.build());
...
消费者:
@StreamListener(ApplicationChannels.ACCOUNT_REQUEST)
public void processAccountRequest(
final Message<AccountEvent> message,
@Header(AwsHeaders.CHECKPOINTER) final IRecordProcessorCheckpointer checkpointer,
@Header("X-B3-TraceId") final String traceId) {
...
在消费者中记录 MessageHeaders 的结果:
Received message header b3 and value 3e6c8447beeff928-3e6c8447beeff928-0
Received message header nativeHeaders and value {b3=[3e6c8447beeff928-3e6c8447beeff928-0]}
Received message header x-b3-traceid and value 304f6cc831322a5c
Received message header aws_shard and value shardId-000000000000
Received message header aws_checkpointer and value com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer@338f0685
Received message header id and value 7754e5ff-e09d-886c-a425-8e0068a57a89
Received message header sourceData and value UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49625973706469884904499220492564923628215483380266958850, getData()=java.nio.HeapByteBuffer[pos=0 lim=171 cap=171], getPartitionKey()=8162231937]
Received message header contentType and value application/vnd.accountevent.v1+avro
Received message header aws_receivedPartitionKey and value 8162231937
Received message header aws_receivedStream and value account-request
Received message header aws_receivedSequenceNumber and value 49625973706469884904499220492564923628215483380266958850
Received message header timestamp and value 1650915186028
我有一个这样的 Spring 云应用程序:
application.properties
logging.level.root=warn
spring.sleuth.tracer.mode=brave
spring.cloud.stream.bindings.input.destination=test-stream
spring.cloud.stream.bindings.input.group=my-group
spring.cloud.stream.bindings.output.destination=test-stream
spring.cloud.stream.kinesis.binder.headers=X-B3-TraceId
So72003034Application
@SpringBootApplication
@EnableBinding({ Source.class, Sink.class })
public class So72003034Application {
public static void main(String[] args) {
SpringApplication.run(So72003034Application.class, args);
}
@StreamListener(Sink.INPUT)
public void processAccountRequest(
Message<String> message,
@Header("X-B3-TraceId") final String traceId) {
System.out.println("Consumer trace: " + traceId);
}
}
So72003034ApplicationTests
@SpringBootTest(properties = "cloud.aws.region.static=us-east-1")
@Testcontainers(disabledWithoutDocker = true)
@DirtiesContext
class So72003034ApplicationTests {
@Container
public static LocalStackContainer localStack =
new LocalStackContainer(DockerImageName.parse("localstack/localstack"))
.withServices(LocalStackContainer.Service.KINESIS);
@Autowired
Source source;
@Autowired
Tracer tracer;
@Test
void testTracingPropagation() throws InterruptedException {
String traceId = this.tracer.newTrace().context().traceIdString();
System.out.println("Producer trace: " + traceId);
this.source.output()
.send(MessageBuilder.withPayload("tet")
.setHeader("X-B3-TraceId", traceId)
.build());
Thread.sleep(10_000);
}
@TestConfiguration
public static class LocalStackConfiguration {
@Bean
public AmazonKinesisAsync amazonKinesis() {
return AmazonKinesisAsyncClientBuilder.standard()
.withEndpointConfiguration(localStack.getEndpointConfiguration(LocalStackContainer.Service.KINESIS))
.withCredentials(localStack.getDefaultCredentialsProvider())
.build();
}
@Bean
public LockRegistry lockRegistry() {
return new DefaultLockRegistry();
}
@Bean
public ConcurrentMetadataStore checkpointStore() {
return new SimpleMetadataStore();
}
}
}
我看到 X-B3-TraceId
header 通过 Kinesis 流从生产者传播到消费者。
我无法确认是否启用了 KCL,因为它的生产者库不再适用于 Windows。
我还在调试模式下看到接收到消息容器a b3
header。也许是 Spring Cloud Sleuth 自动填充的那个。
更新
工作策略是传播整个b3
header:
spring.cloud.stream.kinesis.binder.headers=b3
我的制作人是这样的,只是因为还没有行动的痕迹:
ScopedSpan scopedSpan = this.tracer.startScopedSpan("test");
String traceId = scopedSpan.context().traceId();
System.out.println("Producer trace: " + traceId);
this.source.output().send(new GenericMessage<>("test"));
scopedSpan.end();
这样 TracingChannelInterceptor
就可以从 ThreadLocal
中获取轨迹,开始一个新的跨度并将其设置为 b3
header.
在消费者方面我有:
@StreamListener(Sink.INPUT)
public void processAccountRequest(Message<String> message,
@Header("b3") final String trace) {
System.out.println("Consumer b3: " + trace);
}
这确实显示了与我发送给生产者相同的跟踪:
Producer trace: 957fd8316345f39f
Consumer b3: 957fd8316345f39f-44810ee5bf7cb0b7-0