将 InputStream 转换为 StringBuilder 并将 StringBuilder 转换为 OutputStream

Convert InputStream to StringBuilder and StringBuilder to OutputStream

我有一个特定的用例,我需要能够转换我收到的数据:

我不想在字符串之间来回转换,因为我已经有了 StringBuilder(但如果无论哪种方式,它都等同于内存中包含完整内容的字符串,那么我可以将它们更改为字符串) . 我不明白的是,当我在下面的 类 中创建它们之间的不同传输类型进行测试时,即使我已经将每个传输 类 设为单独的线程(如 Callable<void>) 所以我很困惑为什么会这样,我什至尝试用 Runnbale/Thread 替换并做 start(), join() 仍然有同样的问题;我想这是我看不到的一些编码逻辑错误。

import java.io.*;
import java.util.concurrent.*;

public class Test {
    
    private static String create() {
        StringBuilder sb = new StringBuilder();
        for (int i=0; i<100; i++)
            sb.append(i+".123401234").append(System.lineSeparator());
        return sb.toString();
    }
    
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        ByteArrayInputStream initial_in = new ByteArrayInputStream(create().getBytes());    // data
        PipedInputStream in = new PipedInputStream();                               // intermediary pipe
        PipedOutputStream out = new PipedOutputStream(in);                          // connect OutputStream to pipe InputStream 
        StringBuilder final_out = new StringBuilder();
        NativeToNativeTransfer ntnt = new NativeToNativeTransfer(initial_in, out); // InputStream to OutputStream
        NativeToCustomTransfer ntct = new NativeToCustomTransfer(in, final_out);   // InputStream to StringBuilder 
        Future<Void> f1 = executor.submit(ntnt);
        Future<Void> f2 = executor.submit(ntct);
        f1.get(); f2.get();     // deadlock here ? 
        System.out.println(final_out);
        System.out.println("Done");
    }

    public static class StreamTransfer implements Callable<Void> {
        public static final int BUFFER_SIZE = 1024;
        private InputStream in;
        private OutputStream out;
        
        public StreamTransfer(InputStream in, OutputStream out) {
            this.in = in;
            this.out = out;
        }
        
        @Override
        public Void call() throws Exception {
            BufferedInputStream bis = new BufferedInputStream(in);
            BufferedOutputStream bos = new BufferedOutputStream(out);
            byte[] buffer = new byte[BUFFER_SIZE];
            while (bis.read(buffer) != -1) 
                bos.write(buffer, 0, BUFFER_SIZE);
            bos.flush();
            return null;
        }
    }

    public static class NativeToCustomTransfer implements Callable<Void> {
        private InputStream in;
        private StringBuilder sb;
        
        public NativeToCustomTransfer(InputStream in, StringBuilder out) {
            this.in = in;
            sb = out;
        }
        
        @Override
        public Void call() throws Exception {
            BufferedInputStream bis = new BufferedInputStream(in);
            byte[] buffer = new byte[StreamTransfer.BUFFER_SIZE];
            while (bis.read(buffer) != -1) 
                sb.append(new String(buffer));
            return null;
        }
    }
    
    public static class CustomToNativeTransfer extends StreamTransfer { 
        public CustomToNativeTransfer(StringBuilder in, OutputStream out) {
            super(new ByteArrayInputStream(in.toString().getBytes()), out);
        }
    }
    
    public static class NativeToNativeTransfer extends StreamTransfer {
        public NativeToNativeTransfer(InputStream in, OutputStream out) {
            super(in, out);
        }
    }
    
    public static class CustomToCustomTransfer {
        public CustomToCustomTransfer(StringBuilder in, StringBuilder out) {
            in.chars().forEach(out::append);
        } 
    }
}

编辑:

修正了 DuncG 和 Holger 指出的所有错误后,代码实际上按计划运行,不再死锁:

package test;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;

public class Test {
    
    private static String create() {
        StringBuilder sb = new StringBuilder();
        for (int i=0; i<100; i++)
            sb.append(i+".123401234").append(System.lineSeparator());
        return sb.toString();
    }
    
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        ByteArrayInputStream initial_in = new ByteArrayInputStream(create().getBytes(StandardCharsets.UTF_8));  // data
        PipedInputStream in = new PipedInputStream();                               // intermediary pipe
        PipedOutputStream out = new PipedOutputStream(in);                          // connect OutputStream to pipe InputStream 
        StringBuilder final_out = new StringBuilder();
        NativeToNativeTransfer ntnt = new NativeToNativeTransfer(initial_in, out); // InputStream to OutputStream
        NativeToCustomTransfer ntct = new NativeToCustomTransfer(in, final_out);   // InputStream to StringBuilder 
        Future<Void> f1 = executor.submit(ntnt);
        Future<Void> f2 = executor.submit(ntct);
        f1.get(); f2.get();     // no more deadlock
        System.out.println(final_out);
        System.out.println("Done");
    }
    
    public static class NativeToCustomTransfer implements Callable<Void> {
        private InputStream in;
        private StringBuilder sb;
        
        public NativeToCustomTransfer(InputStream in, StringBuilder out) {
            this.in = in;
            sb = out;
        }
        
        @Override
        public Void call() throws Exception {
            byte[] buffer = new byte[StreamTransfer.BUFFER_SIZE];
            int read = 0;
            while ((read = in.read(buffer)) != -1) 
                sb.append(new String(buffer, 0, read, StandardCharsets.UTF_8));
            in.close();
            return null;
        }
    }
    
    public static class CustomToNativeTransfer extends StreamTransfer { 
        public CustomToNativeTransfer(StringBuilder in, OutputStream out) {
            super(new ByteArrayInputStream(in.toString().getBytes()), out);
        }
    }
    
    public static class NativeToNativeTransfer extends StreamTransfer {
        public NativeToNativeTransfer(InputStream in, OutputStream out) {
            super(in, out);
        }
    }
    
    public static class CustomToCustomTransfer {
        public CustomToCustomTransfer(StringBuilder in, StringBuilder out) {
            in.chars().forEach(out::append);
        } 
    }
    
    public static class StreamTransfer implements Callable<Void> {
        public static final int BUFFER_SIZE = 1024;
        private InputStream in;
        private OutputStream out;
        
        public StreamTransfer(InputStream in, OutputStream out) {
            this.in = in;
            this.out = out;
        }
        
        @Override
        public Void call() throws Exception {
            byte[] buffer = new byte[BUFFER_SIZE];
            int read = 0;
            while ((read = in.read(buffer)) != -1) 
                out.write(buffer, 0, read);
            in.close();
            out.close();
            return null;
        }
    }
}

1) None 流在完成后关闭,这意味着从管道读取将等待更多输入。改变冲洗关闭。为了更清晰的代码,请尝试使用资源来保证每个 call() 关闭 in/out 流。在以后的 JRE(可能是 9+?)中,您只需要在 try-with-resources 块中引用一个变量以使其自动关闭:

try(var try_will_autoclose_this_resource = out)
{
    while ((len = in.read(buffer)) != -1)
        out.write(buffer, 0, len);
}

2) in.read(buff) 的逻辑应该将值存储到局部变量中,以便它复制正确的长度

while ( (len = bis.read(buffer)) != -1) bos.write(buffer, 0, len); 

3) 你使用新字节[BUFFER_SIZE] 所以 I/O 是分块完成的,这意味着你所有的 BufferedInput.Output 流都是不必要的

4) 如果您的默认字符集是多字节,则使用 new String(buffer) 可能不起作用,因为最终字符的字节集可能不完整。