如何从对一项服务的调用中拦截 headers 并将其插入到 gRPC-java 中的另一个请求?
How to intercept the headers from in call to one service and insert it to another request in gRPC-java?
我有两个服务器 - HelloServer 和 WorldServer。
两者实现相同的原型文件:
// The greeting service definition.
service GreeterService {
// Sends a greeting
rpc GreetWithHelloOrWorld (GreeterRequest) returns (GreeterReply) {}
rpc GreetWithHelloWorld (GreeterRequest) returns (GreeterReply) {}
}
message GreeterRequest {
string id = 1;
}
// The response message containing the greetings
message GreeterReply {
string message = 1;
string id = 2;
}
我想在请求中添加 traceIds。据我了解,这是通过在元数据 object.
中添加 traceId 来实现的
这是我用来检查是否传递了 traceId 的测试。向 HelloServer 发出请求,后者又调用 WorldServer,最后 returns 响应。
@Test
public void greetHelloWorld() {
String traceId = UUID.randomUUID().toString();
Metadata metadata = new Metadata();
metadata.put(MetadataKeys.TRACE_ID_METADATA_KEY, traceId);
Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(traceId).build();
ManagedChannel channel = ManagedChannelBuilder
.forAddress("localhost", 8080)
.usePlaintext(true)
.build();
AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
AtomicReference<Metadata> headersCapture = new AtomicReference<>();
ClientInterceptor clientInterceptor = MetadataUtils.newAttachHeadersInterceptor(metadata);
ClientInterceptor metadataCapturingClientInterceptor = MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture);
GreeterServiceBlockingStub blockingStub = GreeterServiceGrpc.newBlockingStub(ClientInterceptors.intercept(channel, clientInterceptor, metadataCapturingClientInterceptor));
GreeterServiceStub asyncStub = GreeterServiceGrpc.newStub(channel);
try {
Greeter.GreeterReply greeterReply = blockingStub.greetWithHelloWorld(greeterRequest);
String idInResponse = greeterReply.getId();
String idInHeaders = headersCapture.get().get(MetadataKeys.TRACE_ID_METADATA_KEY);
logger.info("Response from HelloService and WorldService -- , id = {}, headers = {}", greeterReply.getMessage(), idInResponse, idInHeaders);
assertEquals("Ids in response and header did not match", idInResponse, idInHeaders);
} catch (StatusRuntimeException e) {
logger.warn("Exception when calling HelloService and WorldService\n" + e);
fail();
} finally {
channel.shutdown();
}
}
ServerInterceptor 的实现:
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
String traceId = headers.get(MetadataKeys.TRACE_ID_METADATA_KEY);
logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " Trace id -- 1=" + headers.get(MetadataKeys.TRACE_ID_METADATA_KEY));
return next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata headers) {
headers.put(MetadataKeys.TRACE_ID_METADATA_KEY, traceId);
logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " Trace id -- 2 " + headers.get(MetadataKeys.TRACE_ID_METADATA_KEY));
super.sendHeaders(headers);
}
@Override
public void sendMessage(RespT message) {
logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " message=" + message.toString());
super.sendMessage(message);
}
}, headers);
这里是 greetWithHelloWorld() 方法的实现:
public void greetWithHelloWorld(com.comcast.manitoba.world.hello.Greeter.GreeterRequest request,
io.grpc.stub.StreamObserver<com.comcast.manitoba.world.hello.Greeter.GreeterReply> responseObserver) {
Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(request.getId()).build();
Metadata metadata = new Metadata();
metadata.put(MetadataKeys.TRACE_ID_METADATA_KEY, "");
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8081)
.usePlaintext(true).build();
AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
AtomicReference<Metadata> headersCapture = new AtomicReference<>();
ClientInterceptor clientInterceptor = MetadataUtils.newAttachHeadersInterceptor(metadata);
ClientInterceptor metadataCapturingClientInterceptor = MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture);
GreeterServiceGrpc.GreeterServiceBlockingStub blockingStub = GreeterServiceGrpc.newBlockingStub(ClientInterceptors.intercept(channel, clientInterceptor, metadataCapturingClientInterceptor));
String messageFromWorldService = "";
String replyIdFromWorldService = "";
try {
Greeter.GreeterReply greeterReply = blockingStub.greetWithHelloOrWorld(greeterRequest);
messageFromWorldService = greeterReply.getMessage();
replyIdFromWorldService = greeterReply.getId();
logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, replyIdFromWorldService);
} catch (StatusRuntimeException e) {
logger.warn("Exception when calling HelloService\n" + e);
}
Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService).setId(replyIdFromWorldService).build();
responseObserver.onNext(greeterReply);
responseObserver.onCompleted();
}
问题出在 greetWithHelloWorld() 方法中,我无权访问元数据,因此无法从 header 中提取 traceId 并将其附加到对世界服务器的请求中。但是,如果我在那个方法中放置一个断点,我可以看到请求 object 中确实有 traceId,它是私有的并且不可访问。
有什么办法可以实现吗?另外,这是传递 traceIds 的最佳方式吗?我找到了一些使用 Context 的参考资料。上下文和元数据有什么区别?
预期的方法是使用 ClientInterceptor 和 ServerInterceptor。客户端拦截器将从上下文复制到元数据中。服务器拦截器将从元数据复制到上下文。在服务器拦截器中使用 Contexts.interceptCall 以应用上下文所有回调。
元数据用于线级传播。上下文用于进程内传播。通常,应用程序不需要直接与元数据交互(Java)。
我有两个服务器 - HelloServer 和 WorldServer。
两者实现相同的原型文件:
// The greeting service definition.
service GreeterService {
// Sends a greeting
rpc GreetWithHelloOrWorld (GreeterRequest) returns (GreeterReply) {}
rpc GreetWithHelloWorld (GreeterRequest) returns (GreeterReply) {}
}
message GreeterRequest {
string id = 1;
}
// The response message containing the greetings
message GreeterReply {
string message = 1;
string id = 2;
}
我想在请求中添加 traceIds。据我了解,这是通过在元数据 object.
中添加 traceId 来实现的这是我用来检查是否传递了 traceId 的测试。向 HelloServer 发出请求,后者又调用 WorldServer,最后 returns 响应。
@Test
public void greetHelloWorld() {
String traceId = UUID.randomUUID().toString();
Metadata metadata = new Metadata();
metadata.put(MetadataKeys.TRACE_ID_METADATA_KEY, traceId);
Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(traceId).build();
ManagedChannel channel = ManagedChannelBuilder
.forAddress("localhost", 8080)
.usePlaintext(true)
.build();
AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
AtomicReference<Metadata> headersCapture = new AtomicReference<>();
ClientInterceptor clientInterceptor = MetadataUtils.newAttachHeadersInterceptor(metadata);
ClientInterceptor metadataCapturingClientInterceptor = MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture);
GreeterServiceBlockingStub blockingStub = GreeterServiceGrpc.newBlockingStub(ClientInterceptors.intercept(channel, clientInterceptor, metadataCapturingClientInterceptor));
GreeterServiceStub asyncStub = GreeterServiceGrpc.newStub(channel);
try {
Greeter.GreeterReply greeterReply = blockingStub.greetWithHelloWorld(greeterRequest);
String idInResponse = greeterReply.getId();
String idInHeaders = headersCapture.get().get(MetadataKeys.TRACE_ID_METADATA_KEY);
logger.info("Response from HelloService and WorldService -- , id = {}, headers = {}", greeterReply.getMessage(), idInResponse, idInHeaders);
assertEquals("Ids in response and header did not match", idInResponse, idInHeaders);
} catch (StatusRuntimeException e) {
logger.warn("Exception when calling HelloService and WorldService\n" + e);
fail();
} finally {
channel.shutdown();
}
}
ServerInterceptor 的实现:
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
String traceId = headers.get(MetadataKeys.TRACE_ID_METADATA_KEY);
logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " Trace id -- 1=" + headers.get(MetadataKeys.TRACE_ID_METADATA_KEY));
return next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata headers) {
headers.put(MetadataKeys.TRACE_ID_METADATA_KEY, traceId);
logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " Trace id -- 2 " + headers.get(MetadataKeys.TRACE_ID_METADATA_KEY));
super.sendHeaders(headers);
}
@Override
public void sendMessage(RespT message) {
logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " message=" + message.toString());
super.sendMessage(message);
}
}, headers);
这里是 greetWithHelloWorld() 方法的实现:
public void greetWithHelloWorld(com.comcast.manitoba.world.hello.Greeter.GreeterRequest request,
io.grpc.stub.StreamObserver<com.comcast.manitoba.world.hello.Greeter.GreeterReply> responseObserver) {
Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(request.getId()).build();
Metadata metadata = new Metadata();
metadata.put(MetadataKeys.TRACE_ID_METADATA_KEY, "");
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8081)
.usePlaintext(true).build();
AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
AtomicReference<Metadata> headersCapture = new AtomicReference<>();
ClientInterceptor clientInterceptor = MetadataUtils.newAttachHeadersInterceptor(metadata);
ClientInterceptor metadataCapturingClientInterceptor = MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture);
GreeterServiceGrpc.GreeterServiceBlockingStub blockingStub = GreeterServiceGrpc.newBlockingStub(ClientInterceptors.intercept(channel, clientInterceptor, metadataCapturingClientInterceptor));
String messageFromWorldService = "";
String replyIdFromWorldService = "";
try {
Greeter.GreeterReply greeterReply = blockingStub.greetWithHelloOrWorld(greeterRequest);
messageFromWorldService = greeterReply.getMessage();
replyIdFromWorldService = greeterReply.getId();
logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, replyIdFromWorldService);
} catch (StatusRuntimeException e) {
logger.warn("Exception when calling HelloService\n" + e);
}
Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService).setId(replyIdFromWorldService).build();
responseObserver.onNext(greeterReply);
responseObserver.onCompleted();
}
问题出在 greetWithHelloWorld() 方法中,我无权访问元数据,因此无法从 header 中提取 traceId 并将其附加到对世界服务器的请求中。但是,如果我在那个方法中放置一个断点,我可以看到请求 object 中确实有 traceId,它是私有的并且不可访问。
有什么办法可以实现吗?另外,这是传递 traceIds 的最佳方式吗?我找到了一些使用 Context 的参考资料。上下文和元数据有什么区别?
预期的方法是使用 ClientInterceptor 和 ServerInterceptor。客户端拦截器将从上下文复制到元数据中。服务器拦截器将从元数据复制到上下文。在服务器拦截器中使用 Contexts.interceptCall 以应用上下文所有回调。
元数据用于线级传播。上下文用于进程内传播。通常,应用程序不需要直接与元数据交互(Java)。