并行打印多个 SSH 命令 运行 的输出
Print output from multiple SSH commands running in parallel
我正在尝试编写日志工具,它将通过 ssh 连接到几个服务器,打开指定的日志文件并将结果打印到 System.out.print
。现在,我已经实现了从一个来源获取日志。从 SSHManager
class 开始,只需使用 Jsch
即可实现。
public void tailLogFile() {
System.out.println("Starting to monitor logs for " + server.getIp());
String command = "tail -f " + server.getLogFilePath();
try {
Channel channel = getSession().openChannel("exec");
((ChannelExec)channel).setCommand(command);
InputStream commandOutput = channel.getInputStream();
channel.connect();
int readByte = commandOutput.read();
while(readByte != 0xffffffff) {
readByte = commandOutput.read();
System.out.print(server.getFontColor().toString() + (char)readByte);
}
channel.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
我猜其余的在这里无关紧要,它将彩色日志从 SSH 打印到我的 System.out
。但是,这个程序的主要目的是将多个文件记录到一个地方。所以我尝试关注
for(SSHManager sshManager : getSshManagers()) {
sshManager.tailLogFile();
}
它现在不工作,它从 for-loop
的第一次迭代开始 print
记录,并且由于 SSHManager.tailLogFile()
中的 while
没有终止,它一直从第一个来源打印日志。可以想象,我希望 SSHManager
的 n 个实例共享 System.out
并同时为我提供所有来源的输出。我想知道实现这一目标的最简单方法是什么?我需要深入了解并发性吗?
您必须以非阻塞方式连续读取所有输出流。
您可以使用 InputStream.available()
,像这样:
ArrayList<ChannelExec> channels = new ArrayList<ChannelExec>();
ChannelExec channel;
channel = (ChannelExec)session1.openChannel("exec");
channel.setCommand(
"echo one && sleep 2 && echo two && sleep 2 && echo three");
channel.connect();
channels.add(channel);
channel = (ChannelExec)session2.openChannel("exec");
channel.setCommand(
"sleep 1 && echo eins && sleep 2 && echo zwei && sleep 2 && echo drei");
channel.connect();
channels.add(channel);
ArrayList<InputStream> outputs = new ArrayList<InputStream>();
for (int i = 0; i < channels.size(); i++)
{
outputs.add(channels.get(i).getInputStream());
}
Boolean anyOpened = true;
while (anyOpened)
{
anyOpened = false;
for (int i = 0; i < channels.size(); i++)
{
channel = channels.get(i);
if (!channel.isClosed())
{
anyOpened = true;
InputStream output = outputs.get(i);
while (output.available() > 0)
{
int readByte = output.read();
System.out.print((char)readByte);
}
}
}
}
会得到你(假设 Linux 服务器):
one
eins
two
zwei
three
drei
请注意,答案读取 bytes/characters 的输出。它不保证在切换到另一个会话之前您会得到完整的一行。因此,您最终可能会混合来自不同会话的部分台词。在将缓冲区打印到输出之前,您应该在缓冲区中累积 bytes/characters,寻找新行。
至于我,我更喜欢为通道提供一个 OutputStream 来写入,而不是从它提供给我的 InputStream 中读取。
我会这样定义:
protected class MyOutputStream extends OutputStream {
private StringBuilder stringBuilder = new StringBuilder();
private Object lock;
public MyOutputStream(Object lock) {
this.lock = lock;
}
@Override
public void write(int b) throws IOException {
this.stringBuilder.append(b);
if (b == '\n') {
this.parseOutput();
}
}
@Override
public void write(byte[] b) throws IOException {
String str = new String(b);
this.stringBuilder.append(str);
if (str.contains("\n")) {
this.parseOutput();
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
String str = new String(b, off, len);
this.stringBuilder.append(str);
if (str.contains("\n")) {
this.parseOutput();
}
}
@Override
public void flush() throws IOException {
}
@Override
public void close() throws IOException {
LOGGER.info("My output stream has closed");
}
private void parseOutput() throws IOException {
// we split the text but we make sure not to drop the empty strings or the trailing char
String[] lines = this.stringBuilder.toString().split("\n", -1);
int num = 0;
int last = lines.length - 1;
String trunkated = null;
// synchronize the writing
synchronized (this.lock) {
for (String line : lines) {
// Dont treat the trunkated last line
if (num == last && line.length() > 0) {
trunkated = line;
break;
}
// write a full line
System.out.print(line);
num++;
}
}
// flush the buffer and keep the last trunkated line
this.stringBuilder.setLength(0);
if (trunkated != null) {
this.stringBuilder.append(trunkated);
}
}
}
所以用法是这样的:
ArrayList<ChannelExec> channels = new ArrayList<ChannelExec>();
Object lock = new Object();
ChannelExec channel;
channel = (ChannelExec)session1.openChannel("exec");
channel.setCommand("echo one && sleep 2 && echo two && sleep 2 && echo three");
channel.setOutputStream(new MyOutputStream(lock));
channel.connect();
channels.add(channel);
channel = (ChannelExec)session2.openChannel("exec");
channel.setCommand("sleep 1 && echo eins && sleep 2 && echo zwei && sleep 2 && echo drei");
channel.setOutputStream(new MyOutputStream(lock));
channel.connect();
channels.add(channel);
for (ChannelExec channel : channels) {
while (!channel.isClosed()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
好处是您可以受益于 Jsch 通道中已经存在的多线程,然后您可以避免日志泛滥导致其他日志无法打印的问题。
用不同的流处理每个日志也更容易和更清晰 class。
StringBuilder 是累积字符直到获得完整行的好方法。
另请注意,一次写一整行可避免每个字符调用一个函数,并将写入的字符数与您的系统相乘
server.getFontColor().toString()
一定要正确锁定,我写的代码没有测试
我正在尝试编写日志工具,它将通过 ssh 连接到几个服务器,打开指定的日志文件并将结果打印到 System.out.print
。现在,我已经实现了从一个来源获取日志。从 SSHManager
class 开始,只需使用 Jsch
即可实现。
public void tailLogFile() {
System.out.println("Starting to monitor logs for " + server.getIp());
String command = "tail -f " + server.getLogFilePath();
try {
Channel channel = getSession().openChannel("exec");
((ChannelExec)channel).setCommand(command);
InputStream commandOutput = channel.getInputStream();
channel.connect();
int readByte = commandOutput.read();
while(readByte != 0xffffffff) {
readByte = commandOutput.read();
System.out.print(server.getFontColor().toString() + (char)readByte);
}
channel.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
我猜其余的在这里无关紧要,它将彩色日志从 SSH 打印到我的 System.out
。但是,这个程序的主要目的是将多个文件记录到一个地方。所以我尝试关注
for(SSHManager sshManager : getSshManagers()) {
sshManager.tailLogFile();
}
它现在不工作,它从 for-loop
的第一次迭代开始 print
记录,并且由于 SSHManager.tailLogFile()
中的 while
没有终止,它一直从第一个来源打印日志。可以想象,我希望 SSHManager
的 n 个实例共享 System.out
并同时为我提供所有来源的输出。我想知道实现这一目标的最简单方法是什么?我需要深入了解并发性吗?
您必须以非阻塞方式连续读取所有输出流。
您可以使用 InputStream.available()
,像这样:
ArrayList<ChannelExec> channels = new ArrayList<ChannelExec>();
ChannelExec channel;
channel = (ChannelExec)session1.openChannel("exec");
channel.setCommand(
"echo one && sleep 2 && echo two && sleep 2 && echo three");
channel.connect();
channels.add(channel);
channel = (ChannelExec)session2.openChannel("exec");
channel.setCommand(
"sleep 1 && echo eins && sleep 2 && echo zwei && sleep 2 && echo drei");
channel.connect();
channels.add(channel);
ArrayList<InputStream> outputs = new ArrayList<InputStream>();
for (int i = 0; i < channels.size(); i++)
{
outputs.add(channels.get(i).getInputStream());
}
Boolean anyOpened = true;
while (anyOpened)
{
anyOpened = false;
for (int i = 0; i < channels.size(); i++)
{
channel = channels.get(i);
if (!channel.isClosed())
{
anyOpened = true;
InputStream output = outputs.get(i);
while (output.available() > 0)
{
int readByte = output.read();
System.out.print((char)readByte);
}
}
}
}
会得到你(假设 Linux 服务器):
one
eins
two
zwei
three
drei
请注意,答案读取 bytes/characters 的输出。它不保证在切换到另一个会话之前您会得到完整的一行。因此,您最终可能会混合来自不同会话的部分台词。在将缓冲区打印到输出之前,您应该在缓冲区中累积 bytes/characters,寻找新行。
至于我,我更喜欢为通道提供一个 OutputStream 来写入,而不是从它提供给我的 InputStream 中读取。
我会这样定义:
protected class MyOutputStream extends OutputStream {
private StringBuilder stringBuilder = new StringBuilder();
private Object lock;
public MyOutputStream(Object lock) {
this.lock = lock;
}
@Override
public void write(int b) throws IOException {
this.stringBuilder.append(b);
if (b == '\n') {
this.parseOutput();
}
}
@Override
public void write(byte[] b) throws IOException {
String str = new String(b);
this.stringBuilder.append(str);
if (str.contains("\n")) {
this.parseOutput();
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
String str = new String(b, off, len);
this.stringBuilder.append(str);
if (str.contains("\n")) {
this.parseOutput();
}
}
@Override
public void flush() throws IOException {
}
@Override
public void close() throws IOException {
LOGGER.info("My output stream has closed");
}
private void parseOutput() throws IOException {
// we split the text but we make sure not to drop the empty strings or the trailing char
String[] lines = this.stringBuilder.toString().split("\n", -1);
int num = 0;
int last = lines.length - 1;
String trunkated = null;
// synchronize the writing
synchronized (this.lock) {
for (String line : lines) {
// Dont treat the trunkated last line
if (num == last && line.length() > 0) {
trunkated = line;
break;
}
// write a full line
System.out.print(line);
num++;
}
}
// flush the buffer and keep the last trunkated line
this.stringBuilder.setLength(0);
if (trunkated != null) {
this.stringBuilder.append(trunkated);
}
}
}
所以用法是这样的:
ArrayList<ChannelExec> channels = new ArrayList<ChannelExec>();
Object lock = new Object();
ChannelExec channel;
channel = (ChannelExec)session1.openChannel("exec");
channel.setCommand("echo one && sleep 2 && echo two && sleep 2 && echo three");
channel.setOutputStream(new MyOutputStream(lock));
channel.connect();
channels.add(channel);
channel = (ChannelExec)session2.openChannel("exec");
channel.setCommand("sleep 1 && echo eins && sleep 2 && echo zwei && sleep 2 && echo drei");
channel.setOutputStream(new MyOutputStream(lock));
channel.connect();
channels.add(channel);
for (ChannelExec channel : channels) {
while (!channel.isClosed()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
好处是您可以受益于 Jsch 通道中已经存在的多线程,然后您可以避免日志泛滥导致其他日志无法打印的问题。 用不同的流处理每个日志也更容易和更清晰 class。 StringBuilder 是累积字符直到获得完整行的好方法。
另请注意,一次写一整行可避免每个字符调用一个函数,并将写入的字符数与您的系统相乘 server.getFontColor().toString()
一定要正确锁定,我写的代码没有测试