如何从对一项服务的调用中拦截 headers 并将其插入到 gRPC-java 中的另一个请求?

How to intercept the headers from in call to one service and insert it to another request in gRPC-java?

我有两个服务器 - HelloServer 和 WorldServer。

两者实现相同的原型文件:

// The greeting service definition.
service GreeterService {
    // Sends a greeting
    rpc GreetWithHelloOrWorld (GreeterRequest) returns (GreeterReply) {}
    rpc GreetWithHelloWorld (GreeterRequest) returns (GreeterReply) {}
}

message GreeterRequest {
    string id = 1;
}

// The response message containing the greetings
message GreeterReply {
    string message = 1;
    string id = 2;
}

我想在请求中添加 traceIds。据我了解,这是通过在元数据 object.

中添加 traceId 来实现的

这是我用来检查是否传递了 traceId 的测试。向 HelloServer 发出请求,后者又调用 WorldServer,最后 returns 响应。

@Test
public void greetHelloWorld() {
    String traceId = UUID.randomUUID().toString();
    Metadata metadata = new Metadata();
    metadata.put(MetadataKeys.TRACE_ID_METADATA_KEY, traceId);

    Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(traceId).build();

    ManagedChannel channel = ManagedChannelBuilder
        .forAddress("localhost", 8080)
        .usePlaintext(true)
        .build();

    AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
    AtomicReference<Metadata> headersCapture = new AtomicReference<>();
    ClientInterceptor clientInterceptor = MetadataUtils.newAttachHeadersInterceptor(metadata);
    ClientInterceptor metadataCapturingClientInterceptor = MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture);

    GreeterServiceBlockingStub blockingStub = GreeterServiceGrpc.newBlockingStub(ClientInterceptors.intercept(channel, clientInterceptor, metadataCapturingClientInterceptor));
    GreeterServiceStub asyncStub = GreeterServiceGrpc.newStub(channel);

    try {
        Greeter.GreeterReply greeterReply = blockingStub.greetWithHelloWorld(greeterRequest);
        String idInResponse = greeterReply.getId();
        String idInHeaders = headersCapture.get().get(MetadataKeys.TRACE_ID_METADATA_KEY);
        logger.info("Response from HelloService and WorldService  -- , id = {}, headers = {}", greeterReply.getMessage(), idInResponse, idInHeaders);
        assertEquals("Ids in response and header did not match", idInResponse, idInHeaders);
    } catch (StatusRuntimeException e) {
        logger.warn("Exception when calling HelloService and WorldService\n" +  e);
        fail();
    } finally {
        channel.shutdown();
    }
}

ServerInterceptor 的实现:

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {

    String traceId = headers.get(MetadataKeys.TRACE_ID_METADATA_KEY);
    logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " Trace id -- 1=" + headers.get(MetadataKeys.TRACE_ID_METADATA_KEY));
    return next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
        @Override
        public void sendHeaders(Metadata headers) {
            headers.put(MetadataKeys.TRACE_ID_METADATA_KEY, traceId);
            logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " Trace id -- 2  " + headers.get(MetadataKeys.TRACE_ID_METADATA_KEY));
            super.sendHeaders(headers);
        }

        @Override
        public void sendMessage(RespT message) {
           logger.info("objId=" + this.toString().substring(this.toString().lastIndexOf('@')) + " message=" + message.toString());
            super.sendMessage(message);
        }
    }, headers);

这里是 greetWithHelloWorld() 方法的实现:

public void greetWithHelloWorld(com.comcast.manitoba.world.hello.Greeter.GreeterRequest request,
                                        io.grpc.stub.StreamObserver<com.comcast.manitoba.world.hello.Greeter.GreeterReply> responseObserver) {
        Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(request.getId()).build();
    
        Metadata metadata = new Metadata();
        metadata.put(MetadataKeys.TRACE_ID_METADATA_KEY, "");
    
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8081)
                .usePlaintext(true).build();
    
        AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
        AtomicReference<Metadata> headersCapture = new AtomicReference<>();
        ClientInterceptor clientInterceptor = MetadataUtils.newAttachHeadersInterceptor(metadata);
        ClientInterceptor metadataCapturingClientInterceptor = MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture);
    
        GreeterServiceGrpc.GreeterServiceBlockingStub blockingStub = GreeterServiceGrpc.newBlockingStub(ClientInterceptors.intercept(channel, clientInterceptor, metadataCapturingClientInterceptor));
    
        String messageFromWorldService = "";
        String replyIdFromWorldService = "";
        try {
            Greeter.GreeterReply greeterReply = blockingStub.greetWithHelloOrWorld(greeterRequest);
            messageFromWorldService = greeterReply.getMessage();
            replyIdFromWorldService = greeterReply.getId();
            logger.info("Response from WorldService  -- {}, id = {}", messageFromWorldService, replyIdFromWorldService);
        } catch (StatusRuntimeException e) {
            logger.warn("Exception when calling HelloService\n" +  e);
        }
    
        Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService).setId(replyIdFromWorldService).build();
        responseObserver.onNext(greeterReply);
        responseObserver.onCompleted();
    }

问题出在 greetWithHelloWorld() 方法中,我无权访问元数据,因此无法从 header 中提取 traceId 并将其附加到对世界服务器的请求中。但是,如果我在那个方法中放置一个断点,我可以看到请求 object 中确实有 traceId,它是私有的并且不可访问。

有什么办法可以实现吗?另外,这是传递 traceIds 的最佳方式吗?我找到了一些使用 Context 的参考资料。上下文和元数据有什么区别?

预期的方法是使用 ClientInterceptor 和 ServerInterceptor。客户端拦截器将从上下文复制到元数据中。服务器拦截器将从元数据复制到上下文。在服务器拦截器中使用 Contexts.interceptCall 以应用上下文所有回调。

元数据用于线级传播。上下文用于进程内传播。通常,应用程序不需要直接与元数据交互(Java)。