如何在 Netty 中分块列表 <Object>

How to chunk List<Object> in Netty

发送文件时,可以ctx.writeAndFlush(new ChunkedFile(new File("file.png")));

List<Object>怎么样?

该列表包含 Stringbytes of image

文档中有 ChunkedInput() 但我无法使用它。

更新

假设在我的处理程序中,在 channelRead0(ChannelHandlerContext ctx, Object o) 方法中我想发送 List<Object> 我已经完成了以下操作

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception {

   List<Object> msg = new ArrayList<>();

   /**getting the bytes of image**/
   byte[] imageInByte;
   BufferedImage originalImage = ImageIO.read(new File(fileName));
   // convert BufferedImage to byte array
   ByteArrayOutputStream bAoS = new ByteArrayOutputStream();
   ImageIO.write(originalImage, "png", bAoS);
   bAoS.flush();
   imageInByte = baos.toByteArray();
   baos.close();

   msg.clear();
   msg.add(0, "String"); //add the String into List
   msg.add(1, imageInByte); //add the bytes of images into list

   /**Chunk the List<Object> and Send it just like the chunked file**/
   ctx.writeAndFlush(new ChunkedInput(DONT_KNOW_WHAT_TO_DO_HERE)); //

}

只需实施您自己的 ChunkedInput<ByteBuf>。按照 Netty 附带的实现,您可以按如下方式实现它:

public class ChunkedList implements ChunkedInput<ByteBuf> {
    private static final byte[] EMPTY = new byte[0];
    private byte[] previousPart = EMPTY;
    private final int chunkSize;
    private final Iterator<Object> iterator;

    public ChunkedList(int chunkSize, List<Object> objs) {
        //chunk size in bytes
        this.chunkSize = chunkSize;
        this.iterator = objs.iterator();
    }


    public ByteBuf readChunk(ChannelHandlerContext ctx) {
        return readChunk(ctx.alloc());
    }

    public ByteBuf readChunk(ByteBufAllocator allocator) {
        if (isEndOfInput())
            return null;
        else {
            ByteBuf buf = allocator.buffer(chunkSize);
            boolean release = true;
            try {
                int bytesRead = 0;
                if (previousPart.length > 0) {
                    if (previousPart.length > chunkSize) {
                        throw new IllegalStateException();
                    }
                    bytesRead += previousPart.length;
                    buf.writeBytes(previousPart);
                }
                boolean done = false;
                while (!done) {
                    if (!iterator.hasNext()) {
                        done = true;
                        previousPart = EMPTY;
                    } else {
                        Object o = iterator.next();
                        //depending on the encoding
                        byte[] bytes = o instanceof String ? ((String) o).getBytes() : (byte[]) o;
                        bytesRead += bytes.length;
                        if (bytesRead > chunkSize) {
                            done = true;
                            previousPart = bytes;
                        } else {
                            buf.writeBytes(bytes);
                        }
                    }
                }
                release = false;
            } finally {
                if (release)
                    buf.release();
            }
            return buf;
        }
    }

    public long length() {
        return -1;
    }

    public boolean isEndOfInput() {
        return !iterator.hasNext() && previousPart.length == 0;
    }

    public long progress() {
        return 0;
    }

    public void close(){
        //close
    }
}

为了编写 ChunkedContentNetty 附带了一个特殊的处理程序。参见 io.netty.handler.stream.ChunkedWriteHandler。所以只需添加到您的下游。这是文档中的引述:

A ChannelHandler that adds support for writing a large data stream asynchronously neither spending a lot of memory nor getting OutOfMemoryError. Large data streaming such as file transfer requires complicated state management in a ChannelHandler implementation. ChunkedWriteHandler manages such complicated states so that you can send a large data stream without difficulties.