使用每个请求在 gRPC 客户端中打开和关闭通道
Open and close channel in the gRPC client with every request
我在 kafka 应用程序中有一个 gRPC 客户端。这意味着客户端将不断打开和关闭通道。
public class UserAgentClient {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private static final Config uaparserConfig = ConfigFactory.load().getConfig(ua);
private final ManagedChannel channel;
private final UserAgentServiceGrpc.UserAgentServiceBlockingStub userAgentBlockingStub;
public UserAgentParserClient() {
this(ManagedChannelBuilder.forAddress(uaConfig.getString("host"), uaConfig.getInt("port")).usePlaintext());
}
public UserAgentClient(ManagedChannelBuilder<?> usePlaintext) {
channel = usePlaintext.build();
userAgentBlockingStub = UserAgentServiceGrpc.newBlockingStub(channel);
}
public UserAgentParseResponse getUserAgent(String userAgent ) {
UserAgentRequest request = UserAgentRequest.newBuilder().setUserAgent(userAgent).build();
UserAgentParseResponse response = null;
try {
response = userAgentBlockingStub.parseUserAgent(request);
} catch(Exception e) {
logger.warn("An exception has occurred during gRPC call to the user agent.", e.getMessage());
}
shutdown();
return response;
}
public void shutdown() {
try {
channel.shutdown();
} catch (InterruptedException ie) {
logger.warn("Interrupted exception during gRPC channel close", ie);
}
}
}
我想知道我是否可以一直保持频道打开?还是每次打新电话都要开一个频道?我想知道,因为我正在测试性能,如果我保持通道打开,它似乎会大大提高。另一方面,有什么我想念的吗?
创建新频道的开销很大,您应该尽可能长时间保持频道打开。
由于通道的打开和关闭很昂贵,所以我从我的客户端中完全删除了 channel = usePlaintext.build();
相反,我在我的 kafka Transformer
中打开和关闭它。在我的 class UserAgentDataEnricher
中实现了 Transformer
.
public class UserAgentDataEnricher implements Transformer<byte[], EnrichedData, KeyValue<byte[], EnrichedData>> {
private UserAgentParserClient userAgentParserClient;
@Override
public void init(ProcessorContext context) {
this.context = context;
open();
// schedule a punctuate() method every 15 minutes
this.context.schedule(900000, PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
close();
open();
logger.info("Re-opening of user agent channel is initialized");
});
}
@Override
public void close() {
userAgentParserClient.shutdown();
}
private void open() {
channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
userAgentClient = new UserAgentClient(channel);
}
...
}
现在我这样初始化我的客户端:
public UserAgentClient(ManagedChannel channel) {
this.channel = channel;
userAgentBlockingStub = UserAgentServiceGrpc.newBlockingStub(channel);
}
我在 kafka 应用程序中有一个 gRPC 客户端。这意味着客户端将不断打开和关闭通道。
public class UserAgentClient {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private static final Config uaparserConfig = ConfigFactory.load().getConfig(ua);
private final ManagedChannel channel;
private final UserAgentServiceGrpc.UserAgentServiceBlockingStub userAgentBlockingStub;
public UserAgentParserClient() {
this(ManagedChannelBuilder.forAddress(uaConfig.getString("host"), uaConfig.getInt("port")).usePlaintext());
}
public UserAgentClient(ManagedChannelBuilder<?> usePlaintext) {
channel = usePlaintext.build();
userAgentBlockingStub = UserAgentServiceGrpc.newBlockingStub(channel);
}
public UserAgentParseResponse getUserAgent(String userAgent ) {
UserAgentRequest request = UserAgentRequest.newBuilder().setUserAgent(userAgent).build();
UserAgentParseResponse response = null;
try {
response = userAgentBlockingStub.parseUserAgent(request);
} catch(Exception e) {
logger.warn("An exception has occurred during gRPC call to the user agent.", e.getMessage());
}
shutdown();
return response;
}
public void shutdown() {
try {
channel.shutdown();
} catch (InterruptedException ie) {
logger.warn("Interrupted exception during gRPC channel close", ie);
}
}
}
我想知道我是否可以一直保持频道打开?还是每次打新电话都要开一个频道?我想知道,因为我正在测试性能,如果我保持通道打开,它似乎会大大提高。另一方面,有什么我想念的吗?
创建新频道的开销很大,您应该尽可能长时间保持频道打开。
由于通道的打开和关闭很昂贵,所以我从我的客户端中完全删除了 channel = usePlaintext.build();
相反,我在我的 kafka Transformer
中打开和关闭它。在我的 class UserAgentDataEnricher
中实现了 Transformer
.
public class UserAgentDataEnricher implements Transformer<byte[], EnrichedData, KeyValue<byte[], EnrichedData>> {
private UserAgentParserClient userAgentParserClient;
@Override
public void init(ProcessorContext context) {
this.context = context;
open();
// schedule a punctuate() method every 15 minutes
this.context.schedule(900000, PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
close();
open();
logger.info("Re-opening of user agent channel is initialized");
});
}
@Override
public void close() {
userAgentParserClient.shutdown();
}
private void open() {
channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
userAgentClient = new UserAgentClient(channel);
}
...
}
现在我这样初始化我的客户端:
public UserAgentClient(ManagedChannel channel) {
this.channel = channel;
userAgentBlockingStub = UserAgentServiceGrpc.newBlockingStub(channel);
}