如何在客户端使用 Java 读取 gRPC 中的元数据
How to read Meta data in gRPC using Java at client side
我正在使用 Java 和 Protoc 3.0 编译器,下面提到了我的原型文件。
https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang
syntax = "proto3";
package Telemetry;
// Interface exported by Agent
service OpenConfigTelemetry {
// Request an inline subscription for data at the specified path.
// The device should send telemetry data back on the same
// connection as the subscription request.
rpc telemetrySubscribe(SubscriptionRequest) returns (stream OpenConfigData) {}
// Terminates and removes an exisiting telemetry subscription
rpc cancelTelemetrySubscription(CancelSubscriptionRequest) returns (CancelSubscriptionReply) {}
// Get the list of current telemetry subscriptions from the
// target. This command returns a list of existing subscriptions
// not including those that are established via configuration.
rpc getTelemetrySubscriptions(GetSubscriptionsRequest) returns (GetSubscriptionsReply) {}
// Get Telemetry Agent Operational States
rpc getTelemetryOperationalState(GetOperationalStateRequest) returns (GetOperationalStateReply) {}
// Return the set of data encodings supported by the device for
// telemetry data
rpc getDataEncodings(DataEncodingRequest) returns (DataEncodingReply) {}
}
// Message sent for a telemetry subscription request
message SubscriptionRequest {
// Data associated with a telemetry subscription
SubscriptionInput input = 1;
// List of data models paths and filters
// which are used in a telemetry operation.
repeated Path path_list = 2;
// The below configuration is not defined in Openconfig RPC.
// It is a proposed extension to configure additional
// subscription request features.
SubscriptionAdditionalConfig additional_config = 3;
}
// Data associated with a telemetry subscription
message SubscriptionInput {
// List of optional collector endpoints to send data for
// this subscription.
// If no collector destinations are specified, the collector
// destination is assumed to be the requester on the rpc channel.
repeated Collector collector_list = 1;
}
// Collector endpoints to send data specified as an ip+port combination.
message Collector {
// IP address of collector endpoint
string address = 1;
// Transport protocol port number for the collector destination.
uint32 port = 2;
}
// Data model path
message Path {
// Data model path of interest
// Path specification for elements of OpenConfig data models
string path = 1;
// Regular expression to be used in filtering state leaves
string filter = 2;
// If this is set to true, the target device will only send
// updates to the collector upon a change in data value
bool suppress_unchanged = 3;
// Maximum time in ms the target device may go without sending
// a message to the collector. If this time expires with
// suppress-unchanged set, the target device must send an update
// message regardless if the data values have changed.
uint32 max_silent_interval = 4;
// Time in ms between collection and transmission of the
// specified data to the collector platform. The target device
// will sample the corresponding data (e.g,. a counter) and
// immediately send to the collector destination.
//
// If sample-frequency is set to 0, then the network device
// must emit an update upon every datum change.
uint32 sample_frequency = 5;
}
// Configure subscription request additional features.
message SubscriptionAdditionalConfig {
// limit the number of records sent in the stream
int32 limit_records = 1;
// limit the time the stream remains open
int32 limit_time_seconds = 2;
}
// Reply to inline subscription for data at the specified path is done in
// two-folds.
// 1. Reply data message sent out using out-of-band channel.
// 2. Telemetry data send back on the same connection as the
// subscription request.
// 1. Reply data message sent out using out-of-band channel.
message SubscriptionReply {
// Response message to a telemetry subscription creation or
// get request.
SubscriptionResponse response = 1;
// List of data models paths and filters
// which are used in a telemetry operation.
repeated Path path_list = 2;
}
// Response message to a telemetry subscription creation or get request.
message SubscriptionResponse {
// Unique id for the subscription on the device. This is
// generated by the device and returned in a subscription
// request or when listing existing subscriptions
uint32 subscription_id = 1;
}
// 2. Telemetry data send back on the same connection as the
// subscription request.
message OpenConfigData {
// router name:export IP address
string system_id = 1;
// line card / RE (slot number)
uint32 component_id = 2;
// PFE (if applicable)
uint32 sub_component_id = 3;
// Path specification for elements of OpenConfig data models
string path = 4;
// Sequence number, monotonically increasing for each
// system_id, component_id, sub_component_id + path.
uint64 sequence_number = 5;
// timestamp (milliseconds since epoch)
uint64 timestamp = 6;
// List of key-value pairs
repeated KeyValue kv = 7;
}
// Simple Key-value, where value could be one of scalar types
message KeyValue {
// Key
string key = 1;
// One of possible values
oneof value {
double double_value = 5;
int64 int_value = 6;
uint64 uint_value = 7;
sint64 sint_value = 8;
bool bool_value = 9;
string str_value = 10;
bytes bytes_value = 11;
}
}
// Message sent for a telemetry subscription cancellation request
message CancelSubscriptionRequest {
// Subscription identifier as returned by the device when
// subscription was requested
uint32 subscription_id = 1;
}
// Reply to telemetry subscription cancellation request
message CancelSubscriptionReply {
// Return code
ReturnCode code = 1;
// Return code string
string code_str = 2;
};
// Result of the operation
enum ReturnCode {
SUCCESS = 0;
NO_SUBSCRIPTION_ENTRY = 1;
UNKNOWN_ERROR = 2;
}
// Message sent for a telemetry get request
message GetSubscriptionsRequest {
// Subscription identifier as returned by the device when
// subscription was requested
// --- or ---
// 0xFFFFFFFF for all subscription identifiers
uint32 subscription_id = 1;
}
// Reply to telemetry subscription get request
message GetSubscriptionsReply {
// List of current telemetry subscriptions
repeated SubscriptionReply subscription_list = 1;
}
// Message sent for telemetry agent operational states request
message GetOperationalStateRequest {
// Per-subscription_id level operational state can be requested.
//
// Subscription identifier as returned by the device when
// subscription was requested
// --- or ---
// 0xFFFFFFFF for all subscription identifiers including agent-level
// operational stats
// --- or ---
// If subscription_id is not present then sent only agent-level
// operational stats
uint32 subscription_id = 1;
// Control verbosity of the output
VerbosityLevel verbosity = 2;
}
// Verbosity Level
enum VerbosityLevel {
DETAIL = 0;
TERSE = 1;
BRIEF = 2;
}
// Reply to telemetry agent operational states request
message GetOperationalStateReply {
// List of key-value pairs where
// key = operational state definition
// value = operational state value
repeated KeyValue kv = 1;
}
// Message sent for a data encoding request
message DataEncodingRequest {
}
// Reply to data encodings supported request
message DataEncodingReply {
repeated EncodingType encoding_list = 1;
}
// Encoding Type Supported
enum EncodingType {
UNDEFINED = 0;
XML = 1;
JSON_IETF = 2;
PROTO3 = 3;
}
为了执行服务调用 (rpc TelemetrySubscribe),我首先需要读取具有订阅 ID 的 header,然后开始读取消息。现在,使用 Java 我能够连接该服务,我确实引入了拦截器但是当我 print/retrieve header 它是空的。我调用拦截器的代码如下,
ClientInterceptor interceptor = new HeaderClientInterceptor();
originChannel = OkHttpChannelBuilder.forAddress(host, port)
.usePlaintext(true)
.build();
Channel channel = ClientInterceptors.intercept(originChannel, interceptor);
telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);
这是读取元数据的拦截器代码。
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY", Metadata.ASCII_STRING_MARSHALLER);
System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY));
想知道有没有其他方法可以读取其中包含订阅 ID 的元数据或第一条消息?我只需要读取具有订阅 ID 的第一条消息,并且 return 与服务器的订阅 ID 相同,以便可以开始流式传输我有等效的 Python 代码使用相同的 proto 文件并且它通过代码与服务器通信以下仅供参考:
sub_req = SubscribeRequestMsg("host",port)
data_itr = stub.telemetrySubscribe(sub_req, _TIMEOUT_SECONDS)
metadata = data_itr.initial_metadata()
if metadata[0][0] == "responseKey":
metainfo = metadata[0][1]
print metainfo
subreply = agent_pb2.SubscriptionReply()
subreply.SetInParent()
google.protobuf.text_format.Merge(metainfo, subreply)
if subreply.response.subscription_id:
SUB_ID = subreply.response.subscription_id
从上面的 python 代码我可以轻松检索元数据 object,不确定如何使用 Java 检索元数据?
阅读元数据后,我得到的是:Metadata({content-type=[application/grpc], grpc-encoding=[identity], grpc-accept-encoding=[identity,deflate,gzip]})
但我知道从元数据到它还有一行,即
response {
subscription_id: 2
}
我如何从 Header 中提取最后一个响应,其中包含订阅 ID。我确实尝试了很多选择,但我在这里迷路了。
您使用的方法用于请求元数据,而不是响应元数据:
public void start(Listener<RespT> responseListener, Metadata headers) {
对于响应元数据,您需要 ClientCall.Listener
并等待 onHeaders 回调:
public void onHeaders(Metadata headers)
我确实觉得您提到的元数据的用法似乎很奇怪。元数据通常用于附加错误详细信息或不特定于 RPC 方法的横切功能(如身份验证、跟踪等)。
通常使用 ClientInterceptor 是不方便的,因为您需要维护对它的引用以便将数据拉回。在您的情况下,数据实际上是元数据。可以更轻松地访问元数据的一种方法是将其放在 Context
.
中
例如,您可以为订阅 ID 创建一个 Context.Key
。在您的客户端拦截器中,您可以提取所需的 Metadata
header,并使用 Context.current().withValue(key, metadata)
将其放入 Context
中。在你的 StreamObserver
中,你可以通过调用 key.get(Context.current())
来提取这个 This。这假设您使用的是异步 API,而不是阻塞 API.
之所以更难,是因为通常元数据是关于呼叫的信息,但与呼叫本身没有直接关系。它用于跟踪、编码、统计、取消等类似的事情。如果某些事情改变了你处理请求的方式,它可能需要直接进入请求本身,而不是站在一边。
万一这对其他人有帮助:
我需要根据 request/response 在我的 gRPC-java 服务器中写一个特定的响应 header。
我最后做的是使用 Context::withValue
将响应 header 值存储在 Context
中(它不会修改现有的上下文,而是实际上创建一个新的上下文),然后在 Context::run
回调中调用请求服务处理程序方法的 StreamObserver::onNext
。 StremableObserver::onNext
从我的 ServerInterceptor
调用我的集合中的 ServerCall::sendHeaders
。在 sendHeaders
中,它可以读取我存储的上下文中的值并设置响应 header 值。
我认为这与@carl-mastrangelo的方法类似,只是可以拼写多一点。
public enum MyServerInterceptor implements ServerInterceptor {
INSTANCE;
public static final Metadata.Key<String> METADATA_KEY =
Metadata.Key.of("fish", ASCII_STRING_MARSHALLER);
public static final Context.Key<String> CONTEXT_KEY = Context.key("dog");
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
ServerCall<ReqT, RespT> myServerCall = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
String value = CONTEXT_KEY.get(Context.current());
responseHeaders.put(METADATA_KEY, value);
super.sendHeaders(responseHeaders);
}
};
return next.startCall(myServerCall, requestHeaders);
}
}
public class MyService extends MyServiceGrpc.MyServiceyImplBase {
@Override
public void serviceMethod(MyRequest request, StreamObserver<MyResponse> responseObserver) {
MyResponse response = new MyResponse();
Context.current().withValue(Context.key("dog"), "cat").run(() -> {
responseObserver.onNext(response);
});
responseObserver.onCompleted();
}
}
这里的关键部分是 responseObserver::onNext
调用 ForwardingServerCall.SimpleForwardingServerCall::sendHeaders
。
如果有更好的方法请告诉我。这一切似乎比我想要的要复杂。
我正在使用 Java 和 Protoc 3.0 编译器,下面提到了我的原型文件。 https://github.com/openconfig/public/blob/master/release/models/rpc/openconfig-rpc-api.yang
syntax = "proto3";
package Telemetry;
// Interface exported by Agent
service OpenConfigTelemetry {
// Request an inline subscription for data at the specified path.
// The device should send telemetry data back on the same
// connection as the subscription request.
rpc telemetrySubscribe(SubscriptionRequest) returns (stream OpenConfigData) {}
// Terminates and removes an exisiting telemetry subscription
rpc cancelTelemetrySubscription(CancelSubscriptionRequest) returns (CancelSubscriptionReply) {}
// Get the list of current telemetry subscriptions from the
// target. This command returns a list of existing subscriptions
// not including those that are established via configuration.
rpc getTelemetrySubscriptions(GetSubscriptionsRequest) returns (GetSubscriptionsReply) {}
// Get Telemetry Agent Operational States
rpc getTelemetryOperationalState(GetOperationalStateRequest) returns (GetOperationalStateReply) {}
// Return the set of data encodings supported by the device for
// telemetry data
rpc getDataEncodings(DataEncodingRequest) returns (DataEncodingReply) {}
}
// Message sent for a telemetry subscription request
message SubscriptionRequest {
// Data associated with a telemetry subscription
SubscriptionInput input = 1;
// List of data models paths and filters
// which are used in a telemetry operation.
repeated Path path_list = 2;
// The below configuration is not defined in Openconfig RPC.
// It is a proposed extension to configure additional
// subscription request features.
SubscriptionAdditionalConfig additional_config = 3;
}
// Data associated with a telemetry subscription
message SubscriptionInput {
// List of optional collector endpoints to send data for
// this subscription.
// If no collector destinations are specified, the collector
// destination is assumed to be the requester on the rpc channel.
repeated Collector collector_list = 1;
}
// Collector endpoints to send data specified as an ip+port combination.
message Collector {
// IP address of collector endpoint
string address = 1;
// Transport protocol port number for the collector destination.
uint32 port = 2;
}
// Data model path
message Path {
// Data model path of interest
// Path specification for elements of OpenConfig data models
string path = 1;
// Regular expression to be used in filtering state leaves
string filter = 2;
// If this is set to true, the target device will only send
// updates to the collector upon a change in data value
bool suppress_unchanged = 3;
// Maximum time in ms the target device may go without sending
// a message to the collector. If this time expires with
// suppress-unchanged set, the target device must send an update
// message regardless if the data values have changed.
uint32 max_silent_interval = 4;
// Time in ms between collection and transmission of the
// specified data to the collector platform. The target device
// will sample the corresponding data (e.g,. a counter) and
// immediately send to the collector destination.
//
// If sample-frequency is set to 0, then the network device
// must emit an update upon every datum change.
uint32 sample_frequency = 5;
}
// Configure subscription request additional features.
message SubscriptionAdditionalConfig {
// limit the number of records sent in the stream
int32 limit_records = 1;
// limit the time the stream remains open
int32 limit_time_seconds = 2;
}
// Reply to inline subscription for data at the specified path is done in
// two-folds.
// 1. Reply data message sent out using out-of-band channel.
// 2. Telemetry data send back on the same connection as the
// subscription request.
// 1. Reply data message sent out using out-of-band channel.
message SubscriptionReply {
// Response message to a telemetry subscription creation or
// get request.
SubscriptionResponse response = 1;
// List of data models paths and filters
// which are used in a telemetry operation.
repeated Path path_list = 2;
}
// Response message to a telemetry subscription creation or get request.
message SubscriptionResponse {
// Unique id for the subscription on the device. This is
// generated by the device and returned in a subscription
// request or when listing existing subscriptions
uint32 subscription_id = 1;
}
// 2. Telemetry data send back on the same connection as the
// subscription request.
message OpenConfigData {
// router name:export IP address
string system_id = 1;
// line card / RE (slot number)
uint32 component_id = 2;
// PFE (if applicable)
uint32 sub_component_id = 3;
// Path specification for elements of OpenConfig data models
string path = 4;
// Sequence number, monotonically increasing for each
// system_id, component_id, sub_component_id + path.
uint64 sequence_number = 5;
// timestamp (milliseconds since epoch)
uint64 timestamp = 6;
// List of key-value pairs
repeated KeyValue kv = 7;
}
// Simple Key-value, where value could be one of scalar types
message KeyValue {
// Key
string key = 1;
// One of possible values
oneof value {
double double_value = 5;
int64 int_value = 6;
uint64 uint_value = 7;
sint64 sint_value = 8;
bool bool_value = 9;
string str_value = 10;
bytes bytes_value = 11;
}
}
// Message sent for a telemetry subscription cancellation request
message CancelSubscriptionRequest {
// Subscription identifier as returned by the device when
// subscription was requested
uint32 subscription_id = 1;
}
// Reply to telemetry subscription cancellation request
message CancelSubscriptionReply {
// Return code
ReturnCode code = 1;
// Return code string
string code_str = 2;
};
// Result of the operation
enum ReturnCode {
SUCCESS = 0;
NO_SUBSCRIPTION_ENTRY = 1;
UNKNOWN_ERROR = 2;
}
// Message sent for a telemetry get request
message GetSubscriptionsRequest {
// Subscription identifier as returned by the device when
// subscription was requested
// --- or ---
// 0xFFFFFFFF for all subscription identifiers
uint32 subscription_id = 1;
}
// Reply to telemetry subscription get request
message GetSubscriptionsReply {
// List of current telemetry subscriptions
repeated SubscriptionReply subscription_list = 1;
}
// Message sent for telemetry agent operational states request
message GetOperationalStateRequest {
// Per-subscription_id level operational state can be requested.
//
// Subscription identifier as returned by the device when
// subscription was requested
// --- or ---
// 0xFFFFFFFF for all subscription identifiers including agent-level
// operational stats
// --- or ---
// If subscription_id is not present then sent only agent-level
// operational stats
uint32 subscription_id = 1;
// Control verbosity of the output
VerbosityLevel verbosity = 2;
}
// Verbosity Level
enum VerbosityLevel {
DETAIL = 0;
TERSE = 1;
BRIEF = 2;
}
// Reply to telemetry agent operational states request
message GetOperationalStateReply {
// List of key-value pairs where
// key = operational state definition
// value = operational state value
repeated KeyValue kv = 1;
}
// Message sent for a data encoding request
message DataEncodingRequest {
}
// Reply to data encodings supported request
message DataEncodingReply {
repeated EncodingType encoding_list = 1;
}
// Encoding Type Supported
enum EncodingType {
UNDEFINED = 0;
XML = 1;
JSON_IETF = 2;
PROTO3 = 3;
}
为了执行服务调用 (rpc TelemetrySubscribe),我首先需要读取具有订阅 ID 的 header,然后开始读取消息。现在,使用 Java 我能够连接该服务,我确实引入了拦截器但是当我 print/retrieve header 它是空的。我调用拦截器的代码如下,
ClientInterceptor interceptor = new HeaderClientInterceptor();
originChannel = OkHttpChannelBuilder.forAddress(host, port)
.usePlaintext(true)
.build();
Channel channel = ClientInterceptors.intercept(originChannel, interceptor);
telemetryStub = OpenConfigTelemetryGrpc.newStub(channel);
这是读取元数据的拦截器代码。
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
Key<String> CUSTOM_HEADER_KEY = Metadata.Key.of("responseKEY", Metadata.ASCII_STRING_MARSHALLER);
System.out.println("Contains Key?? "+headers.containsKey(CUSTOM_HEADER_KEY));
想知道有没有其他方法可以读取其中包含订阅 ID 的元数据或第一条消息?我只需要读取具有订阅 ID 的第一条消息,并且 return 与服务器的订阅 ID 相同,以便可以开始流式传输我有等效的 Python 代码使用相同的 proto 文件并且它通过代码与服务器通信以下仅供参考:
sub_req = SubscribeRequestMsg("host",port)
data_itr = stub.telemetrySubscribe(sub_req, _TIMEOUT_SECONDS)
metadata = data_itr.initial_metadata()
if metadata[0][0] == "responseKey":
metainfo = metadata[0][1]
print metainfo
subreply = agent_pb2.SubscriptionReply()
subreply.SetInParent()
google.protobuf.text_format.Merge(metainfo, subreply)
if subreply.response.subscription_id:
SUB_ID = subreply.response.subscription_id
从上面的 python 代码我可以轻松检索元数据 object,不确定如何使用 Java 检索元数据?
阅读元数据后,我得到的是:Metadata({content-type=[application/grpc], grpc-encoding=[identity], grpc-accept-encoding=[identity,deflate,gzip]})
但我知道从元数据到它还有一行,即
response {
subscription_id: 2
}
我如何从 Header 中提取最后一个响应,其中包含订阅 ID。我确实尝试了很多选择,但我在这里迷路了。
您使用的方法用于请求元数据,而不是响应元数据:
public void start(Listener<RespT> responseListener, Metadata headers) {
对于响应元数据,您需要 ClientCall.Listener
并等待 onHeaders 回调:
public void onHeaders(Metadata headers)
我确实觉得您提到的元数据的用法似乎很奇怪。元数据通常用于附加错误详细信息或不特定于 RPC 方法的横切功能(如身份验证、跟踪等)。
通常使用 ClientInterceptor 是不方便的,因为您需要维护对它的引用以便将数据拉回。在您的情况下,数据实际上是元数据。可以更轻松地访问元数据的一种方法是将其放在 Context
.
例如,您可以为订阅 ID 创建一个 Context.Key
。在您的客户端拦截器中,您可以提取所需的 Metadata
header,并使用 Context.current().withValue(key, metadata)
将其放入 Context
中。在你的 StreamObserver
中,你可以通过调用 key.get(Context.current())
来提取这个 This。这假设您使用的是异步 API,而不是阻塞 API.
之所以更难,是因为通常元数据是关于呼叫的信息,但与呼叫本身没有直接关系。它用于跟踪、编码、统计、取消等类似的事情。如果某些事情改变了你处理请求的方式,它可能需要直接进入请求本身,而不是站在一边。
万一这对其他人有帮助:
我需要根据 request/response 在我的 gRPC-java 服务器中写一个特定的响应 header。
我最后做的是使用 Context::withValue
将响应 header 值存储在 Context
中(它不会修改现有的上下文,而是实际上创建一个新的上下文),然后在 Context::run
回调中调用请求服务处理程序方法的 StreamObserver::onNext
。 StremableObserver::onNext
从我的 ServerInterceptor
调用我的集合中的 ServerCall::sendHeaders
。在 sendHeaders
中,它可以读取我存储的上下文中的值并设置响应 header 值。
我认为这与@carl-mastrangelo的方法类似,只是可以拼写多一点。
public enum MyServerInterceptor implements ServerInterceptor {
INSTANCE;
public static final Metadata.Key<String> METADATA_KEY =
Metadata.Key.of("fish", ASCII_STRING_MARSHALLER);
public static final Context.Key<String> CONTEXT_KEY = Context.key("dog");
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
ServerCall<ReqT, RespT> myServerCall = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
String value = CONTEXT_KEY.get(Context.current());
responseHeaders.put(METADATA_KEY, value);
super.sendHeaders(responseHeaders);
}
};
return next.startCall(myServerCall, requestHeaders);
}
}
public class MyService extends MyServiceGrpc.MyServiceyImplBase {
@Override
public void serviceMethod(MyRequest request, StreamObserver<MyResponse> responseObserver) {
MyResponse response = new MyResponse();
Context.current().withValue(Context.key("dog"), "cat").run(() -> {
responseObserver.onNext(response);
});
responseObserver.onCompleted();
}
}
这里的关键部分是 responseObserver::onNext
调用 ForwardingServerCall.SimpleForwardingServerCall::sendHeaders
。
如果有更好的方法请告诉我。这一切似乎比我想要的要复杂。