并行打印多个 SSH 命令 运行 的输出

Print output from multiple SSH commands running in parallel

我正在尝试编写日志工具,它将通过 ssh 连接到几个服务器,打开指定的日志文件并将结果打印到 System.out.print。现在,我已经实现了从一个来源获取日志。从 SSHManager class 开始,只需使用 Jsch 即可实现。

public void tailLogFile() {
     System.out.println("Starting to monitor logs for " + server.getIp());
     String command = "tail -f " + server.getLogFilePath();
     try {
         Channel channel = getSession().openChannel("exec");
         ((ChannelExec)channel).setCommand(command);
         InputStream commandOutput = channel.getInputStream();
         channel.connect();
         int readByte = commandOutput.read();

         while(readByte != 0xffffffff) {
             readByte = commandOutput.read();
             System.out.print(server.getFontColor().toString() + (char)readByte);
         }
         channel.disconnect();

     } catch (Exception e) {
         e.printStackTrace();
     }
 }

我猜其余的在这里无关紧要,它将彩色日志从 SSH 打印到我的 System.out。但是,这个程序的主要目的是将多个文件记录到一个地方。所以我尝试关注

for(SSHManager sshManager : getSshManagers()) {
       sshManager.tailLogFile();
}

它现在不工作,它从 for-loop 的第一次迭代开始 print 记录,并且由于 SSHManager.tailLogFile() 中的 while 没有终止,它一直从第一个来源打印日志。可以想象,我希望 SSHManager 的 n 个实例共享 System.out 并同时为我提供所有来源的输出。我想知道实现这一目标的最简单方法是什么?我需要深入了解并发性吗?

您必须以非阻塞方式连续读取所有输出流。

您可以使用 InputStream.available(),像这样:

ArrayList<ChannelExec> channels = new ArrayList<ChannelExec>();

ChannelExec channel;
channel = (ChannelExec)session1.openChannel("exec");
channel.setCommand(
    "echo one && sleep 2 && echo two && sleep 2 && echo three");
channel.connect();
channels.add(channel);

channel = (ChannelExec)session2.openChannel("exec");
channel.setCommand(
    "sleep 1 && echo eins && sleep 2 && echo zwei && sleep 2 && echo drei");
channel.connect();
channels.add(channel);

ArrayList<InputStream> outputs = new ArrayList<InputStream>();
for (int i = 0; i < channels.size(); i++)
{
    outputs.add(channels.get(i).getInputStream());
}

Boolean anyOpened = true;
while (anyOpened)
{
    anyOpened = false;
    for (int i = 0; i < channels.size(); i++)
    {
        channel = channels.get(i);
        if (!channel.isClosed())
        {
            anyOpened = true;
            InputStream output = outputs.get(i);
            while (output.available() > 0)
            {
                int readByte = output.read();
                System.out.print((char)readByte);
            }
        }
    }
}

会得到你(假设 Linux 服务器):

one
eins
two
zwei
three
drei

请注意,答案读取 bytes/characters 的输出。它不保证在切换到另一个会话之前您会得到完整的一行。因此,您最终可能会混合来自不同会话的部分台词。在将缓冲区打印到输出之前,您应该在缓冲区中累积 bytes/characters,寻找新行。

至于我,我更喜欢为通道提供一个 OutputStream 来写入,而不是从它提供给我的 InputStream 中读取。

我会这样定义:

protected class MyOutputStream extends OutputStream {

    private StringBuilder stringBuilder = new StringBuilder();
    private Object lock;

    public MyOutputStream(Object lock) {
        this.lock = lock;
    }

    @Override
    public void write(int b) throws IOException {
        this.stringBuilder.append(b);

        if (b == '\n') {
            this.parseOutput();
        }
    }

    @Override
    public void write(byte[] b) throws IOException {
        String str = new String(b);
        this.stringBuilder.append(str);

        if (str.contains("\n")) {
            this.parseOutput();
        }
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        String str = new String(b, off, len);
        this.stringBuilder.append(str);

        if (str.contains("\n")) {
            this.parseOutput();
        }
    }

    @Override
    public void flush() throws IOException {
    }

    @Override
    public void close() throws IOException {
        LOGGER.info("My output stream has closed");
    }

    private void parseOutput() throws IOException {
        // we split the text but we make sure not to drop the empty strings or the trailing char
        String[] lines = this.stringBuilder.toString().split("\n", -1);

        int num = 0;
        int last = lines.length - 1;
        String trunkated = null;

        // synchronize the writing
        synchronized (this.lock) {
            for (String line : lines) {
                // Dont treat the trunkated last line
                if (num == last && line.length() > 0) {
                    trunkated = line;
                    break;
                }
                // write a full line    
                System.out.print(line);     

                num++;
            }
        }

        // flush the buffer and keep the last trunkated line
        this.stringBuilder.setLength(0);
        if (trunkated != null) {
            this.stringBuilder.append(trunkated);
        }
    }
}

所以用法是这样的:

ArrayList<ChannelExec> channels = new ArrayList<ChannelExec>();
Object lock = new Object();

ChannelExec channel;
channel = (ChannelExec)session1.openChannel("exec");
channel.setCommand("echo one && sleep 2 && echo two && sleep 2 && echo three");
channel.setOutputStream(new MyOutputStream(lock));
channel.connect();
channels.add(channel);

channel = (ChannelExec)session2.openChannel("exec");
channel.setCommand("sleep 1 && echo eins && sleep 2 && echo zwei && sleep 2 && echo drei");
channel.setOutputStream(new MyOutputStream(lock));
channel.connect();
channels.add(channel);

for (ChannelExec channel : channels) {
    while (!channel.isClosed()) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    } 
}

好处是您可以受益于 Jsch 通道中已经存在的多线程,然后您可以避免日志泛滥导致其他日志无法打印的问题。 用不同的流处理每个日志也更容易和更清晰 class。 StringBuilder 是累积字符直到获得完整行的好方法。

另请注意,一次写一整行可避免每个字符调用一个函数,并将写入的字符数与您的系统相乘 server.getFontColor().toString()

一定要正确锁定,我写的代码没有测试