使用 Netty 进行管道流式传输
Piped Streaming with Netty
我正在尝试将 "thumbnail generator" 实施为微服务。我认为这样的东西可能最适合用作 TCP 服务器,因此在简要调查了几个选项后,我选择了 Netty。为了使服务尽可能节省内存,我宁愿避免将完整图像加载到内存中,因此一直在尝试构建一个管道,其 "ThumbnailHandler" 可以使用管道流来利用 Netty 的分块读取因此,当 Netty 接收到更多字节时,缩略图生成器可以遍历更多流。不幸的是,我对 Netty 或 NIO 模式总体上还不够熟悉,不知道我是否以最好的方式解决这个问题,而且我什至无法像我期望的那样使用简化版本。
这是我的服务器设置:
public class ThumbnailerServer {
private int port;
public ThumbnailerServer(int port) {
this.port = port;
}
public void run() throws Exception {
final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
final ThreadFactory connectFactory = new DefaultThreadFactory("connect");
final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER);
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(0, connectFactory, NioUdtProvider.BYTE_PROVIDER);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(acceptGroup, connectGroup)
.channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<UdtChannel>() {
@Override
public void initChannel(UdtChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("handler", new ThumbnailerServerHandler());
}
});
// bind and start to accept incoming connections.
b.bind(port).sync().channel().closeFuture().sync();
} finally {
connectGroup.shutdownGracefully();
acceptGroup.shutdownGracefully();
}
}
}
缩略图处理程序:
public class ThumbnailerServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger logger = LoggerFactory.getLogger(ThumbnailerServerHandler.class);
private PipedInputStream toThumbnailer = new PipedInputStream();
private PipedOutputStream fromClient = new PipedOutputStream(toThumbnailer);
private static final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(5));
private ListenableFuture<OutputStream> future;
public ThumbnailerServerHandler() throws IOException {
super(ByteBuf.class, true);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
future = executor.submit(() -> ThumbnailGenerator.generate(toThumbnailer));
future.addListener(() -> {
try {
ctx.writeAndFlush(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}, executor);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
this.fromClient.close();
this.toThumbnailer.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
int readableBytes = msg.readableBytes();
msg.readBytes(this.fromClient, readableBytes);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("Encountered error during communication", cause);
ctx.close();
}
}
这是我的简化 "thumbnailer",直到我让整个流程开始工作:
public class ThumbnailGenerator {
public static OutputStream generate(InputStream toThumbnailer) {
OutputStream stream = new ByteArrayOutputStream();
try {
IOUtils.copy(toThumbnailer, stream);
} catch (IOException e) {
e.printStackTrace();
}
return stream;
}
}
- 像这样在 handlerAdded 方法中分拆一个异步任务是否合适?还有更多 "netty" 方法吗?
- IOUtils.copy 应该并且确实会阻塞(由于对管道输入流的读取),直到有数据可供读取,这就是为什么我将它卸载到执行程序池中,因为我不能如果我想继续接收字节,请在处理程序中阻止。但是,我发现这个 永远不会 完成,但它确实取得了进展 。那是因为我从未遇到过 EOF 字节 (-1) 吗?我怎样才能让这个流程发挥作用?
- 我是否缺少 netty 中可以简化此过程的结构?我想过将它实现为一个解码器,直到它拥有整个流才解码,但随后我会将所有内容加载到内存中。
好吧,事实证明我有一些误解,这些误解解释了为什么我无法正常工作。
1) 许多文件类型没有 so-called 个终止字节。事实上,EOF 字节(最常见的是 -1,因为它是一个溢出值)通常是阅读器提供的一种实现,用于向他们的消费者传达他们已经到达内容的末尾。它通常不存在于文件本身中。
2) channelReadComplete 并不像听起来那么清晰。 channelReadComplete 在达到 netty 中配置的最大读取次数(默认为 10)后调用,或者当它有理由相信消息已完全发送时调用,如读取空缓冲区或接收小于配置的块大小。
至于为什么看起来输入流副本挂起,那是因为管道输入流从未产生终端值(这是 EOF 字节的 reader-implementation 的示例)。 PipedInputStreams 仅在驱动它们的输出流关闭后才指示 EOF。
为了使这个实现工作,我应该将消息计数增加到足够高的数量,并在最后一次读取 returns 小于块大小后调用可信的 channelReadComplete 。此时,关闭并重置下一条消息的输出流是安全的。关闭输出流将导致输入流最终返回那个 EOF 字节,其他一切都可以继续。
我正在尝试将 "thumbnail generator" 实施为微服务。我认为这样的东西可能最适合用作 TCP 服务器,因此在简要调查了几个选项后,我选择了 Netty。为了使服务尽可能节省内存,我宁愿避免将完整图像加载到内存中,因此一直在尝试构建一个管道,其 "ThumbnailHandler" 可以使用管道流来利用 Netty 的分块读取因此,当 Netty 接收到更多字节时,缩略图生成器可以遍历更多流。不幸的是,我对 Netty 或 NIO 模式总体上还不够熟悉,不知道我是否以最好的方式解决这个问题,而且我什至无法像我期望的那样使用简化版本。
这是我的服务器设置:
public class ThumbnailerServer {
private int port;
public ThumbnailerServer(int port) {
this.port = port;
}
public void run() throws Exception {
final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
final ThreadFactory connectFactory = new DefaultThreadFactory("connect");
final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER);
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(0, connectFactory, NioUdtProvider.BYTE_PROVIDER);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(acceptGroup, connectGroup)
.channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<UdtChannel>() {
@Override
public void initChannel(UdtChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("handler", new ThumbnailerServerHandler());
}
});
// bind and start to accept incoming connections.
b.bind(port).sync().channel().closeFuture().sync();
} finally {
connectGroup.shutdownGracefully();
acceptGroup.shutdownGracefully();
}
}
}
缩略图处理程序:
public class ThumbnailerServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger logger = LoggerFactory.getLogger(ThumbnailerServerHandler.class);
private PipedInputStream toThumbnailer = new PipedInputStream();
private PipedOutputStream fromClient = new PipedOutputStream(toThumbnailer);
private static final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(5));
private ListenableFuture<OutputStream> future;
public ThumbnailerServerHandler() throws IOException {
super(ByteBuf.class, true);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
future = executor.submit(() -> ThumbnailGenerator.generate(toThumbnailer));
future.addListener(() -> {
try {
ctx.writeAndFlush(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}, executor);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
this.fromClient.close();
this.toThumbnailer.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
int readableBytes = msg.readableBytes();
msg.readBytes(this.fromClient, readableBytes);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("Encountered error during communication", cause);
ctx.close();
}
}
这是我的简化 "thumbnailer",直到我让整个流程开始工作:
public class ThumbnailGenerator {
public static OutputStream generate(InputStream toThumbnailer) {
OutputStream stream = new ByteArrayOutputStream();
try {
IOUtils.copy(toThumbnailer, stream);
} catch (IOException e) {
e.printStackTrace();
}
return stream;
}
}
- 像这样在 handlerAdded 方法中分拆一个异步任务是否合适?还有更多 "netty" 方法吗?
- IOUtils.copy 应该并且确实会阻塞(由于对管道输入流的读取),直到有数据可供读取,这就是为什么我将它卸载到执行程序池中,因为我不能如果我想继续接收字节,请在处理程序中阻止。但是,我发现这个 永远不会 完成,但它确实取得了进展 。那是因为我从未遇到过 EOF 字节 (-1) 吗?我怎样才能让这个流程发挥作用?
- 我是否缺少 netty 中可以简化此过程的结构?我想过将它实现为一个解码器,直到它拥有整个流才解码,但随后我会将所有内容加载到内存中。
好吧,事实证明我有一些误解,这些误解解释了为什么我无法正常工作。
1) 许多文件类型没有 so-called 个终止字节。事实上,EOF 字节(最常见的是 -1,因为它是一个溢出值)通常是阅读器提供的一种实现,用于向他们的消费者传达他们已经到达内容的末尾。它通常不存在于文件本身中。
2) channelReadComplete 并不像听起来那么清晰。 channelReadComplete 在达到 netty 中配置的最大读取次数(默认为 10)后调用,或者当它有理由相信消息已完全发送时调用,如读取空缓冲区或接收小于配置的块大小。
至于为什么看起来输入流副本挂起,那是因为管道输入流从未产生终端值(这是 EOF 字节的 reader-implementation 的示例)。 PipedInputStreams 仅在驱动它们的输出流关闭后才指示 EOF。
为了使这个实现工作,我应该将消息计数增加到足够高的数量,并在最后一次读取 returns 小于块大小后调用可信的 channelReadComplete 。此时,关闭并重置下一条消息的输出流是安全的。关闭输出流将导致输入流最终返回那个 EOF 字节,其他一切都可以继续。