JSch 与执行者

JSch with Executor

我在使用 ExecutorService 进行多个 SSH 作业(使用 JSch)时遇到问题 运行。

当 运行 执行单个作业时,它们会按预期执行。但是,当我尝试 运行 多个作业时,只有第一个作业完全执行。我已经将线程池更改为仅使用 1 个线程,认为这是一个端口问题,但它仍然无法正常工作。任何帮助将不胜感激。

ExecutorService ex = Executors.newFixedThreadPool(4);
Set<Future<Task>> set = new HashSet<Future<Task>>();
for (int i = 0; i < tasks.size(); i++) {
    Task task = tasks.get(i);            
    creds.setHostName(task.getFirewall().getAddress()); 
    System.out.println("Task(" + i + ")=" + task);
    Callable<Task> worker = new SSH.SSHWorker(creds, task, i);
    ex.submit(worker);
}

这是SSH

public class SSH {

    LoginCredentials creds;
    //ChannelExec channelExec;
    ChannelExec channelExec;
    BufferedReader consoleOutput;
    Session session;
    InputStream is;
    UI ui;
    String output = "";
    //  String command;
    public boolean debug = false;

    public SSH(LoginCredentials creds, String command) throws JSchException, IOException {
        // System.out.println("NEW SSH");
        this.creds = creds;
        consoleOutput = new BufferedReader(new InputStreamReader(System.in));
        ui = new UI();

        this.connect();
        Channel channel = session.openChannel("exec");
        this.sendCommand(channel, command);

    }

    public String getOutput() {
        return output;
    }

    public class UI extends MyUserInfo {

        String message;

        @Override
        public void showMessage(String message) {
            this.message = message;
            super.showMessage(message); //To change body of generated methods, choose Tools | Templates.
        }

        @Override
        public String getMessage() {
            return this.message;
        }
    }

    private void sendCommand(Channel channel, String command) throws IOException, JSchException {

        this.channelExec = (ChannelExec) channel;
        this.channelExec.setCommand(command);
        //channel.setInputStream(null);
        channel.setOutputStream(System.out);
        this.is = channel.getInputStream();
        channel.connect();
        byte[] buffer = new byte[1024];
        while (channel.getExitStatus() == -1) {
            //System.out.println("Here 1.1");
            while (is.available() > 0) {
                //System.out.println("Here 1.2");
                int i = is.read(buffer, 0, 1024);
                //System.out.println("i= " + i);
                if (i < 0) {
                    System.out.println("breaking");
                    break;
                }
                String string = new String(buffer, 0, i);

                output = output.concat(string);
                //System.out.println("output= "+output);
                if (string.contains("Command authorization failed")) {
                    channel.disconnect();
                    break;
                }

            }

            if (channel.isClosed()) {
                System.out.println("exit-status: " + channel.getExitStatus());
                break;
            }

        }
        is.close();
        channel.disconnect();
        this.session.disconnect();

    }

    private void connect() throws JSchException {
        JSch jsch = new JSch();
        session = jsch.getSession(creds.getUser(), creds.getHostName(), 22);
        session.setTimeout(0);
        session.setPassword(creds.getPassword());
        Properties config = new Properties();
        config.put("StrictHostKeyChecking", "no");
        session.setConfig(config);
        session.setUserInfo(ui);
        session.connect();
        System.out.println("Connected in SSH");
    }

    public static class SSHWorker implements Callable {

        private LoginCredentials lc;
        private Task t;
        private int thread;

        public SSHWorker(LoginCredentials creds, Task task, int thread) {
            this.t = task;
            lc = creds;
            this.thread = thread;
            System.out.println("Creds= " + lc.getHostName() + " Task= " + t + " Thread-" + thread);
        }

        @Override
        public Task call() throws JSchException, IOException {
            System.out.println("Doing Call For: Creds= " + lc.getHostName() + " Task= " + t + " Thread-" + thread);
            String enablepassword1 = (String) lc.getEnablePassword().get(0);
            SSH ssh = new SSH(lc, t.getSSHCommand(enablepassword1));
            this.t.setTaskResult(ssh.getOutput());
            return t;
        }
    }
}

输出在这里(IP地址已更改)

在创建任务的循环中,您只使用了 LoginCredentials 的一个实例。所以所有任务共享实例。在循环中,您在每次迭代中覆盖主机名。所以最后 LoginCredentials 指的是最后一个任务的主机名。所以所有任务都连接到该主机。

只是为了不通过 LoginCredentials 传递主机名并在创建 JSch 会话时直接使用 task.getFirewall().getAddress():

SSH ssh = new SSH(lc, t.getSSHCommand(enablepassword1), task.getFirewall().getAddress());

...

public SSH(LoginCredentials creds, String command, String hostname) 
{
    ...
    this.connect(hostname);
}

...

private void connect(String hostname) throws JSchException {
    JSch jsch = new JSch();
    session = jsch.getSession(creds.getUser(), hostname, 22);
    ...
}