将 InputStream 转换为 StringBuilder 并将 StringBuilder 转换为 OutputStream
Convert InputStream to StringBuilder and StringBuilder to OutputStream
我有一个特定的用例,我需要能够转换我收到的数据:
- InputStream 并存储在 StringBuilder 中
- StringBuilder 并写入 OutputStream
我不想在字符串之间来回转换,因为我已经有了 StringBuilder(但如果无论哪种方式,它都等同于内存中包含完整内容的字符串,那么我可以将它们更改为字符串) .
我不明白的是,当我在下面的 类 中创建它们之间的不同传输类型进行测试时,即使我已经将每个传输 类 设为单独的线程(如 Callable<void>
) 所以我很困惑为什么会这样,我什至尝试用 Runnbale/Thread 替换并做 start()
, join()
仍然有同样的问题;我想这是我看不到的一些编码逻辑错误。
import java.io.*;
import java.util.concurrent.*;
public class Test {
private static String create() {
StringBuilder sb = new StringBuilder();
for (int i=0; i<100; i++)
sb.append(i+".123401234").append(System.lineSeparator());
return sb.toString();
}
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(10);
ByteArrayInputStream initial_in = new ByteArrayInputStream(create().getBytes()); // data
PipedInputStream in = new PipedInputStream(); // intermediary pipe
PipedOutputStream out = new PipedOutputStream(in); // connect OutputStream to pipe InputStream
StringBuilder final_out = new StringBuilder();
NativeToNativeTransfer ntnt = new NativeToNativeTransfer(initial_in, out); // InputStream to OutputStream
NativeToCustomTransfer ntct = new NativeToCustomTransfer(in, final_out); // InputStream to StringBuilder
Future<Void> f1 = executor.submit(ntnt);
Future<Void> f2 = executor.submit(ntct);
f1.get(); f2.get(); // deadlock here ?
System.out.println(final_out);
System.out.println("Done");
}
public static class StreamTransfer implements Callable<Void> {
public static final int BUFFER_SIZE = 1024;
private InputStream in;
private OutputStream out;
public StreamTransfer(InputStream in, OutputStream out) {
this.in = in;
this.out = out;
}
@Override
public Void call() throws Exception {
BufferedInputStream bis = new BufferedInputStream(in);
BufferedOutputStream bos = new BufferedOutputStream(out);
byte[] buffer = new byte[BUFFER_SIZE];
while (bis.read(buffer) != -1)
bos.write(buffer, 0, BUFFER_SIZE);
bos.flush();
return null;
}
}
public static class NativeToCustomTransfer implements Callable<Void> {
private InputStream in;
private StringBuilder sb;
public NativeToCustomTransfer(InputStream in, StringBuilder out) {
this.in = in;
sb = out;
}
@Override
public Void call() throws Exception {
BufferedInputStream bis = new BufferedInputStream(in);
byte[] buffer = new byte[StreamTransfer.BUFFER_SIZE];
while (bis.read(buffer) != -1)
sb.append(new String(buffer));
return null;
}
}
public static class CustomToNativeTransfer extends StreamTransfer {
public CustomToNativeTransfer(StringBuilder in, OutputStream out) {
super(new ByteArrayInputStream(in.toString().getBytes()), out);
}
}
public static class NativeToNativeTransfer extends StreamTransfer {
public NativeToNativeTransfer(InputStream in, OutputStream out) {
super(in, out);
}
}
public static class CustomToCustomTransfer {
public CustomToCustomTransfer(StringBuilder in, StringBuilder out) {
in.chars().forEach(out::append);
}
}
}
编辑:
修正了 DuncG 和 Holger 指出的所有错误后,代码实际上按计划运行,不再死锁:
package test;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
public class Test {
private static String create() {
StringBuilder sb = new StringBuilder();
for (int i=0; i<100; i++)
sb.append(i+".123401234").append(System.lineSeparator());
return sb.toString();
}
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(10);
ByteArrayInputStream initial_in = new ByteArrayInputStream(create().getBytes(StandardCharsets.UTF_8)); // data
PipedInputStream in = new PipedInputStream(); // intermediary pipe
PipedOutputStream out = new PipedOutputStream(in); // connect OutputStream to pipe InputStream
StringBuilder final_out = new StringBuilder();
NativeToNativeTransfer ntnt = new NativeToNativeTransfer(initial_in, out); // InputStream to OutputStream
NativeToCustomTransfer ntct = new NativeToCustomTransfer(in, final_out); // InputStream to StringBuilder
Future<Void> f1 = executor.submit(ntnt);
Future<Void> f2 = executor.submit(ntct);
f1.get(); f2.get(); // no more deadlock
System.out.println(final_out);
System.out.println("Done");
}
public static class NativeToCustomTransfer implements Callable<Void> {
private InputStream in;
private StringBuilder sb;
public NativeToCustomTransfer(InputStream in, StringBuilder out) {
this.in = in;
sb = out;
}
@Override
public Void call() throws Exception {
byte[] buffer = new byte[StreamTransfer.BUFFER_SIZE];
int read = 0;
while ((read = in.read(buffer)) != -1)
sb.append(new String(buffer, 0, read, StandardCharsets.UTF_8));
in.close();
return null;
}
}
public static class CustomToNativeTransfer extends StreamTransfer {
public CustomToNativeTransfer(StringBuilder in, OutputStream out) {
super(new ByteArrayInputStream(in.toString().getBytes()), out);
}
}
public static class NativeToNativeTransfer extends StreamTransfer {
public NativeToNativeTransfer(InputStream in, OutputStream out) {
super(in, out);
}
}
public static class CustomToCustomTransfer {
public CustomToCustomTransfer(StringBuilder in, StringBuilder out) {
in.chars().forEach(out::append);
}
}
public static class StreamTransfer implements Callable<Void> {
public static final int BUFFER_SIZE = 1024;
private InputStream in;
private OutputStream out;
public StreamTransfer(InputStream in, OutputStream out) {
this.in = in;
this.out = out;
}
@Override
public Void call() throws Exception {
byte[] buffer = new byte[BUFFER_SIZE];
int read = 0;
while ((read = in.read(buffer)) != -1)
out.write(buffer, 0, read);
in.close();
out.close();
return null;
}
}
}
1) None 流在完成后关闭,这意味着从管道读取将等待更多输入。改变冲洗关闭。为了更清晰的代码,请尝试使用资源来保证每个 call() 关闭 in/out 流。在以后的 JRE(可能是 9+?)中,您只需要在 try-with-resources 块中引用一个变量以使其自动关闭:
try(var try_will_autoclose_this_resource = out)
{
while ((len = in.read(buffer)) != -1)
out.write(buffer, 0, len);
}
2) in.read(buff) 的逻辑应该将值存储到局部变量中,以便它复制正确的长度
while ( (len = bis.read(buffer)) != -1) bos.write(buffer, 0, len);
3) 你使用新字节[BUFFER_SIZE] 所以 I/O 是分块完成的,这意味着你所有的 BufferedInput.Output 流都是不必要的
4) 如果您的默认字符集是多字节,则使用 new String(buffer) 可能不起作用,因为最终字符的字节集可能不完整。
我有一个特定的用例,我需要能够转换我收到的数据:
- InputStream 并存储在 StringBuilder 中
- StringBuilder 并写入 OutputStream
我不想在字符串之间来回转换,因为我已经有了 StringBuilder(但如果无论哪种方式,它都等同于内存中包含完整内容的字符串,那么我可以将它们更改为字符串) .
我不明白的是,当我在下面的 类 中创建它们之间的不同传输类型进行测试时,即使我已经将每个传输 类 设为单独的线程(如 Callable<void>
) 所以我很困惑为什么会这样,我什至尝试用 Runnbale/Thread 替换并做 start()
, join()
仍然有同样的问题;我想这是我看不到的一些编码逻辑错误。
import java.io.*;
import java.util.concurrent.*;
public class Test {
private static String create() {
StringBuilder sb = new StringBuilder();
for (int i=0; i<100; i++)
sb.append(i+".123401234").append(System.lineSeparator());
return sb.toString();
}
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(10);
ByteArrayInputStream initial_in = new ByteArrayInputStream(create().getBytes()); // data
PipedInputStream in = new PipedInputStream(); // intermediary pipe
PipedOutputStream out = new PipedOutputStream(in); // connect OutputStream to pipe InputStream
StringBuilder final_out = new StringBuilder();
NativeToNativeTransfer ntnt = new NativeToNativeTransfer(initial_in, out); // InputStream to OutputStream
NativeToCustomTransfer ntct = new NativeToCustomTransfer(in, final_out); // InputStream to StringBuilder
Future<Void> f1 = executor.submit(ntnt);
Future<Void> f2 = executor.submit(ntct);
f1.get(); f2.get(); // deadlock here ?
System.out.println(final_out);
System.out.println("Done");
}
public static class StreamTransfer implements Callable<Void> {
public static final int BUFFER_SIZE = 1024;
private InputStream in;
private OutputStream out;
public StreamTransfer(InputStream in, OutputStream out) {
this.in = in;
this.out = out;
}
@Override
public Void call() throws Exception {
BufferedInputStream bis = new BufferedInputStream(in);
BufferedOutputStream bos = new BufferedOutputStream(out);
byte[] buffer = new byte[BUFFER_SIZE];
while (bis.read(buffer) != -1)
bos.write(buffer, 0, BUFFER_SIZE);
bos.flush();
return null;
}
}
public static class NativeToCustomTransfer implements Callable<Void> {
private InputStream in;
private StringBuilder sb;
public NativeToCustomTransfer(InputStream in, StringBuilder out) {
this.in = in;
sb = out;
}
@Override
public Void call() throws Exception {
BufferedInputStream bis = new BufferedInputStream(in);
byte[] buffer = new byte[StreamTransfer.BUFFER_SIZE];
while (bis.read(buffer) != -1)
sb.append(new String(buffer));
return null;
}
}
public static class CustomToNativeTransfer extends StreamTransfer {
public CustomToNativeTransfer(StringBuilder in, OutputStream out) {
super(new ByteArrayInputStream(in.toString().getBytes()), out);
}
}
public static class NativeToNativeTransfer extends StreamTransfer {
public NativeToNativeTransfer(InputStream in, OutputStream out) {
super(in, out);
}
}
public static class CustomToCustomTransfer {
public CustomToCustomTransfer(StringBuilder in, StringBuilder out) {
in.chars().forEach(out::append);
}
}
}
编辑:
修正了 DuncG 和 Holger 指出的所有错误后,代码实际上按计划运行,不再死锁:
package test;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
public class Test {
private static String create() {
StringBuilder sb = new StringBuilder();
for (int i=0; i<100; i++)
sb.append(i+".123401234").append(System.lineSeparator());
return sb.toString();
}
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(10);
ByteArrayInputStream initial_in = new ByteArrayInputStream(create().getBytes(StandardCharsets.UTF_8)); // data
PipedInputStream in = new PipedInputStream(); // intermediary pipe
PipedOutputStream out = new PipedOutputStream(in); // connect OutputStream to pipe InputStream
StringBuilder final_out = new StringBuilder();
NativeToNativeTransfer ntnt = new NativeToNativeTransfer(initial_in, out); // InputStream to OutputStream
NativeToCustomTransfer ntct = new NativeToCustomTransfer(in, final_out); // InputStream to StringBuilder
Future<Void> f1 = executor.submit(ntnt);
Future<Void> f2 = executor.submit(ntct);
f1.get(); f2.get(); // no more deadlock
System.out.println(final_out);
System.out.println("Done");
}
public static class NativeToCustomTransfer implements Callable<Void> {
private InputStream in;
private StringBuilder sb;
public NativeToCustomTransfer(InputStream in, StringBuilder out) {
this.in = in;
sb = out;
}
@Override
public Void call() throws Exception {
byte[] buffer = new byte[StreamTransfer.BUFFER_SIZE];
int read = 0;
while ((read = in.read(buffer)) != -1)
sb.append(new String(buffer, 0, read, StandardCharsets.UTF_8));
in.close();
return null;
}
}
public static class CustomToNativeTransfer extends StreamTransfer {
public CustomToNativeTransfer(StringBuilder in, OutputStream out) {
super(new ByteArrayInputStream(in.toString().getBytes()), out);
}
}
public static class NativeToNativeTransfer extends StreamTransfer {
public NativeToNativeTransfer(InputStream in, OutputStream out) {
super(in, out);
}
}
public static class CustomToCustomTransfer {
public CustomToCustomTransfer(StringBuilder in, StringBuilder out) {
in.chars().forEach(out::append);
}
}
public static class StreamTransfer implements Callable<Void> {
public static final int BUFFER_SIZE = 1024;
private InputStream in;
private OutputStream out;
public StreamTransfer(InputStream in, OutputStream out) {
this.in = in;
this.out = out;
}
@Override
public Void call() throws Exception {
byte[] buffer = new byte[BUFFER_SIZE];
int read = 0;
while ((read = in.read(buffer)) != -1)
out.write(buffer, 0, read);
in.close();
out.close();
return null;
}
}
}
1) None 流在完成后关闭,这意味着从管道读取将等待更多输入。改变冲洗关闭。为了更清晰的代码,请尝试使用资源来保证每个 call() 关闭 in/out 流。在以后的 JRE(可能是 9+?)中,您只需要在 try-with-resources 块中引用一个变量以使其自动关闭:
try(var try_will_autoclose_this_resource = out)
{
while ((len = in.read(buffer)) != -1)
out.write(buffer, 0, len);
}
2) in.read(buff) 的逻辑应该将值存储到局部变量中,以便它复制正确的长度
while ( (len = bis.read(buffer)) != -1) bos.write(buffer, 0, len);
3) 你使用新字节[BUFFER_SIZE] 所以 I/O 是分块完成的,这意味着你所有的 BufferedInput.Output 流都是不必要的
4) 如果您的默认字符集是多字节,则使用 new String(buffer) 可能不起作用,因为最终字符的字节集可能不完整。