Apache Commons exec PumpStreamHandler 连续输入
Apache Commons exec PumpStreamHandler continuous input
我正在尝试使用 Apache Commons exec 解决与命令行进程的交互问题。我坚持使用以下代码:
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayOutputStream ins = new ByteArrayOutputStream();
OutputStreamWriter ow = new OutputStreamWriter(ins);
BufferedWriter writer = new BufferedWriter(ow);
ByteArrayInputStream in = new ByteArrayInputStream(ins.toByteArray());
PumpStreamHandler psh = new PumpStreamHandler(out, null, in);
CommandLine cl = CommandLine.parse(initProcess);
DefaultExecutor exec = new DefaultExecutor();
DefaultExecuteResultHandler resultHandler = new DefaultExecuteResultHandler();
exec.setStreamHandler(psh);
try {
exec.execute(cl, resultHandler);
int i = 0;
while (true) {
String o = out.toString();
if (!o.trim().isEmpty()) {
System.out.println(o);
out.reset();
}
// --- PROBLEM start ---
if (i == 3) {
writer.write(internalProcessCommand);
// string with or without trailing \n, both tested
writer.flush();
writer.close();
// tested even ins.write(internalProcessCommand.getBytes())
}
// --- PROBLEM end ---
Thread.sleep(3000);
i++;
}
} catch (ExecuteException e) {
System.err.println(e.getMessage());
}
我希望我的代码是清楚的。我连续读取 out
并在 3 秒后打印它,同时清除流。问题是输入到 in
传递给 PumpStreamHandler
。我需要从代码本身不断地动态地传递进程命令,就像我通过 CLI 与进程交互一样。当我简单地使用 System.in
作为 PumpStreamHandler
参数时,我可以很好地从控制台编写处理命令。我怎样才能设法从代码中传递字符串得到相同的结果?
编辑:
我也尝试连接 PipedInputStream
从 PipedOutputStream
接收数据,但似乎数据只能在关闭 PipedOutputStream
后读取,这使得它不可重用,因此我无法实现交互。
编辑 2:
自己解决了。解决方案在下面的答案中。吼。 :-)
好的,我使用内置工具而不是外部库解决了这个问题。多亏了独立线程 Process
' InputStream
:
,我才能够实现我的目标
ProcessBuilder builder = new ProcessBuilder(command);
Process process = builder.start();
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));
StreamReader outputReader = new StreamReader(process.getInputStream(), System.out);
outputReader.start();
StreamReader err = new StreamReader(process.getErrorStream(), System.err);
err.start();
for (int i = 0; i < 3; i++) {
Thread.sleep(5000);
writer.write(internalProcessCommand + "\n");
writer.flush();
}
writer.write("exit\n");
writer.flush();
while (process.isAlive()) {
System.out.println("alive?");
Thread.sleep(100);
}
System.out.println("dead");
outputReader.shutdown();
err.shutdown();
流式阅读器:
class StreamReader extends Thread {
private AtomicBoolean running = new AtomicBoolean(false);
private InputStream in;
private OutputStream out;
public StreamReader(InputStream in, OutputStream out) {
this.in = in;
this.out = out;
running.set(true);
}
@Override
public void run() {
Scanner scanner = new Scanner(in);
PrintWriter writer = new PrintWriter(out, true);
while (running.get()) {
if (scanner.hasNextLine()) {
writer.println(scanner.nextLine());
}
}
scanner.close();
}
public void shutdown() {
running.set(false);
}
}
一个解决方案是复制PumpStreamHandler and StreamPump的实现,比如ImmediatePumpStreamHandler和ImmediateStreamPump,并进行以下两个更改:
- 在 ImmediateStreamPump 的第 108 行
os.write(buf, 0, length);
之后直接添加 os.flush();
。
- 将 ImmediatePumpStreamHandler 中第 269 行的
new StreamPumper(...)
更改为 new ImmediateStreamPump(...)
。
根据@champagniac 给出的答案,我创建了一个简单的修复程序,它通过仅替换 PumpStreamHandler
:
来引入额外的刷新
public class PumpStreamHandlerFixed extends PumpStreamHandler
{
public PumpStreamHandlerFixed()
{
super();
}
public PumpStreamHandlerFixed(OutputStream out, OutputStream err, InputStream input)
{
super(out, err, input);
}
public PumpStreamHandlerFixed(OutputStream out, OutputStream err)
{
super(out, err);
}
public PumpStreamHandlerFixed(OutputStream outAndErr)
{
super(outAndErr);
}
@Override
protected Thread createPump(InputStream is, OutputStream os, boolean closeWhenExhausted)
{
os = new AutoFlushingOutputStream(os);
final Thread result = new Thread(new StreamPumper(is, os, closeWhenExhausted), "Exec Stream Pumper");
result.setDaemon(true);
return result;
}
}
class AutoFlushingOutputStream extends OutputStream
{
private final OutputStream decorated;
public AutoFlushingOutputStream(OutputStream decorated)
{
this.decorated = decorated;
}
@Override
public void write(byte[] b, int off, int len) throws IOException
{
this.decorated.write(b, off, len);
this.decorated.flush();
}
@Override
public void write(int b) throws IOException
{
this.decorated.write(b);
this.decorated.flush();
}
@Override
public void close() throws IOException
{
this.decorated.close();
}
@Override
public void flush() throws IOException
{
this.decorated.flush();
}
}
我正在尝试使用 Apache Commons exec 解决与命令行进程的交互问题。我坚持使用以下代码:
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayOutputStream ins = new ByteArrayOutputStream();
OutputStreamWriter ow = new OutputStreamWriter(ins);
BufferedWriter writer = new BufferedWriter(ow);
ByteArrayInputStream in = new ByteArrayInputStream(ins.toByteArray());
PumpStreamHandler psh = new PumpStreamHandler(out, null, in);
CommandLine cl = CommandLine.parse(initProcess);
DefaultExecutor exec = new DefaultExecutor();
DefaultExecuteResultHandler resultHandler = new DefaultExecuteResultHandler();
exec.setStreamHandler(psh);
try {
exec.execute(cl, resultHandler);
int i = 0;
while (true) {
String o = out.toString();
if (!o.trim().isEmpty()) {
System.out.println(o);
out.reset();
}
// --- PROBLEM start ---
if (i == 3) {
writer.write(internalProcessCommand);
// string with or without trailing \n, both tested
writer.flush();
writer.close();
// tested even ins.write(internalProcessCommand.getBytes())
}
// --- PROBLEM end ---
Thread.sleep(3000);
i++;
}
} catch (ExecuteException e) {
System.err.println(e.getMessage());
}
我希望我的代码是清楚的。我连续读取 out
并在 3 秒后打印它,同时清除流。问题是输入到 in
传递给 PumpStreamHandler
。我需要从代码本身不断地动态地传递进程命令,就像我通过 CLI 与进程交互一样。当我简单地使用 System.in
作为 PumpStreamHandler
参数时,我可以很好地从控制台编写处理命令。我怎样才能设法从代码中传递字符串得到相同的结果?
编辑:
我也尝试连接 PipedInputStream
从 PipedOutputStream
接收数据,但似乎数据只能在关闭 PipedOutputStream
后读取,这使得它不可重用,因此我无法实现交互。
编辑 2: 自己解决了。解决方案在下面的答案中。吼。 :-)
好的,我使用内置工具而不是外部库解决了这个问题。多亏了独立线程 Process
' InputStream
:
ProcessBuilder builder = new ProcessBuilder(command);
Process process = builder.start();
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));
StreamReader outputReader = new StreamReader(process.getInputStream(), System.out);
outputReader.start();
StreamReader err = new StreamReader(process.getErrorStream(), System.err);
err.start();
for (int i = 0; i < 3; i++) {
Thread.sleep(5000);
writer.write(internalProcessCommand + "\n");
writer.flush();
}
writer.write("exit\n");
writer.flush();
while (process.isAlive()) {
System.out.println("alive?");
Thread.sleep(100);
}
System.out.println("dead");
outputReader.shutdown();
err.shutdown();
流式阅读器:
class StreamReader extends Thread {
private AtomicBoolean running = new AtomicBoolean(false);
private InputStream in;
private OutputStream out;
public StreamReader(InputStream in, OutputStream out) {
this.in = in;
this.out = out;
running.set(true);
}
@Override
public void run() {
Scanner scanner = new Scanner(in);
PrintWriter writer = new PrintWriter(out, true);
while (running.get()) {
if (scanner.hasNextLine()) {
writer.println(scanner.nextLine());
}
}
scanner.close();
}
public void shutdown() {
running.set(false);
}
}
一个解决方案是复制PumpStreamHandler and StreamPump的实现,比如ImmediatePumpStreamHandler和ImmediateStreamPump,并进行以下两个更改:
- 在 ImmediateStreamPump 的第 108 行
os.write(buf, 0, length);
之后直接添加os.flush();
。 - 将 ImmediatePumpStreamHandler 中第 269 行的
new StreamPumper(...)
更改为new ImmediateStreamPump(...)
。
根据@champagniac 给出的答案,我创建了一个简单的修复程序,它通过仅替换 PumpStreamHandler
:
public class PumpStreamHandlerFixed extends PumpStreamHandler
{
public PumpStreamHandlerFixed()
{
super();
}
public PumpStreamHandlerFixed(OutputStream out, OutputStream err, InputStream input)
{
super(out, err, input);
}
public PumpStreamHandlerFixed(OutputStream out, OutputStream err)
{
super(out, err);
}
public PumpStreamHandlerFixed(OutputStream outAndErr)
{
super(outAndErr);
}
@Override
protected Thread createPump(InputStream is, OutputStream os, boolean closeWhenExhausted)
{
os = new AutoFlushingOutputStream(os);
final Thread result = new Thread(new StreamPumper(is, os, closeWhenExhausted), "Exec Stream Pumper");
result.setDaemon(true);
return result;
}
}
class AutoFlushingOutputStream extends OutputStream
{
private final OutputStream decorated;
public AutoFlushingOutputStream(OutputStream decorated)
{
this.decorated = decorated;
}
@Override
public void write(byte[] b, int off, int len) throws IOException
{
this.decorated.write(b, off, len);
this.decorated.flush();
}
@Override
public void write(int b) throws IOException
{
this.decorated.write(b);
this.decorated.flush();
}
@Override
public void close() throws IOException
{
this.decorated.close();
}
@Override
public void flush() throws IOException
{
this.decorated.flush();
}
}