Netty 请求超时
Netty request timeout
我正在尝试编写一个 HTTP 服务,该服务将从 HTTP 获取数据并使用 Netty 将其放入 Kafka。我需要在 m5.large EC2 实例上处理 20K RPS,这似乎非常可行。
代码很简单:
Server.java
public class Server {
public static void main(final String[] args) throws Exception {
final EventLoopGroup bossGroup = new EpollEventLoopGroup();
final EventLoopGroup workerGroup = new EpollEventLoopGroup();
try {
final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(EpollServerSocketChannel.class)
.childHandler(new RequestChannelInitializer(createProducer()))
.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static Producer<String, ByteBuffer> createProducer() {
final Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaBidRequestProducer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class.getName());
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
properties.put(ProducerConfig.SEND_BUFFER_CONFIG, 33554432);
return new KafkaProducer<>(properties);
}
}
RequestChannelInitializer.java
public class RequestChannelInitializer extends io.netty.channel.ChannelInitializer<SocketChannel> {
private final Producer<String, ByteBuffer> producer;
public BidRequestChannelInitializer(final Producer<String, ByteBuffer> producer) {
this.producer = producer;
}
@Override
public void initChannel(final SocketChannel ch) {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(1048576));
ch.pipeline().addLast(new RequestHandler(producer));
}
}
RequestHandler.java
public class RequestHandler extends SimpleChannelInboundHandler<FullHttpMessage> {
private final Producer<String, ByteBuffer> producer;
public BidRequestHandler(final Producer<String, ByteBuffer> producer) {
this.producer = producer;
}
@Override
public void channelReadComplete(final ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpMessage msg) {
final DefaultFullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
final ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(
"test",
UUID.randomUUID().toString(),
msg.content().nioBuffer()
);
producer.send(record);
if (HttpUtil.isKeepAlive(msg)) {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
代码摘自官方文档。但是,有时我会在负载测试中遇到 Request 'Post BidRequest' failed: j.u.c.TimeoutException: Request timeout after 60000 ms
异常。
据我了解,这意味着我的负载测试实例和服务实例之间建立了连接,但完成时间超过了 60 秒。这个简单程序的哪一部分可以阻塞这么久?
我调整了 Kafka 生产者:减少了它的超时时间。我知道 send
可能会阻塞,所以我增加了发送缓冲区,但没有帮助。
我还为服务用户增加了 ulimits
。
我在 OpenJDK 1.8 版上 运行。0_171 并且 securerandom.source
设置为 file:/dev/urandom
,因此对 randomUUID
的调用不应阻塞。
你说得对,里面没有任何东西可以阻挡。发送到 Kafka 的调用是异步的。我查看了您的代码,从我所看到的一切看起来都不错。
我要检查的几件事:
- 确保 AWS 中的安全组定义允许 Kafka 服务器和应用程序与 Zookeeper 通信。如果这是 test/POC,您应该只允许所有三个 instances/clusters 之间的所有流量。 60 秒超时让我怀疑网络超时,这可能意味着某些服务无法访问。
- 您是否尝试过更简单的测试,尝试在没有 Netty 依赖的情况下生成 Kafka?也许这有助于缩小问题范围。
我正在尝试编写一个 HTTP 服务,该服务将从 HTTP 获取数据并使用 Netty 将其放入 Kafka。我需要在 m5.large EC2 实例上处理 20K RPS,这似乎非常可行。
代码很简单:
Server.java
public class Server {
public static void main(final String[] args) throws Exception {
final EventLoopGroup bossGroup = new EpollEventLoopGroup();
final EventLoopGroup workerGroup = new EpollEventLoopGroup();
try {
final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(EpollServerSocketChannel.class)
.childHandler(new RequestChannelInitializer(createProducer()))
.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static Producer<String, ByteBuffer> createProducer() {
final Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaBidRequestProducer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class.getName());
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
properties.put(ProducerConfig.SEND_BUFFER_CONFIG, 33554432);
return new KafkaProducer<>(properties);
}
}
RequestChannelInitializer.java
public class RequestChannelInitializer extends io.netty.channel.ChannelInitializer<SocketChannel> {
private final Producer<String, ByteBuffer> producer;
public BidRequestChannelInitializer(final Producer<String, ByteBuffer> producer) {
this.producer = producer;
}
@Override
public void initChannel(final SocketChannel ch) {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(1048576));
ch.pipeline().addLast(new RequestHandler(producer));
}
}
RequestHandler.java
public class RequestHandler extends SimpleChannelInboundHandler<FullHttpMessage> {
private final Producer<String, ByteBuffer> producer;
public BidRequestHandler(final Producer<String, ByteBuffer> producer) {
this.producer = producer;
}
@Override
public void channelReadComplete(final ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpMessage msg) {
final DefaultFullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
final ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(
"test",
UUID.randomUUID().toString(),
msg.content().nioBuffer()
);
producer.send(record);
if (HttpUtil.isKeepAlive(msg)) {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
代码摘自官方文档。但是,有时我会在负载测试中遇到 Request 'Post BidRequest' failed: j.u.c.TimeoutException: Request timeout after 60000 ms
异常。
据我了解,这意味着我的负载测试实例和服务实例之间建立了连接,但完成时间超过了 60 秒。这个简单程序的哪一部分可以阻塞这么久?
我调整了 Kafka 生产者:减少了它的超时时间。我知道 send
可能会阻塞,所以我增加了发送缓冲区,但没有帮助。
我还为服务用户增加了 ulimits
。
我在 OpenJDK 1.8 版上 运行。0_171 并且 securerandom.source
设置为 file:/dev/urandom
,因此对 randomUUID
的调用不应阻塞。
你说得对,里面没有任何东西可以阻挡。发送到 Kafka 的调用是异步的。我查看了您的代码,从我所看到的一切看起来都不错。
我要检查的几件事:
- 确保 AWS 中的安全组定义允许 Kafka 服务器和应用程序与 Zookeeper 通信。如果这是 test/POC,您应该只允许所有三个 instances/clusters 之间的所有流量。 60 秒超时让我怀疑网络超时,这可能意味着某些服务无法访问。
- 您是否尝试过更简单的测试,尝试在没有 Netty 依赖的情况下生成 Kafka?也许这有助于缩小问题范围。