Sftp 文件上传失败

Sftp File Upload Fails

我开发了一个使用 spring 启动的调度程序。此调度程序在本地 VM 中创建文本文件并将这些文件上传到远程 FTP 位置。 调度程序是 运行,每天有 5 个时间段。最后一个时段是晚上 11 点 45 分。 问题是,在 11.45 P.M 文件上传不起作用,但文件正在本地位置创建。 日志包含在这里

2018-10-19 00:00:26.338 ERROR --- [task-scheduler-4] ework.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Error handling message for file [/apps/logs/lesipay-scheduler/to_ctf/ClientCreation18102018_23.txt -> ClientCreation18102018_23.txt]; nested exception is org.springframework.messaging.MessagingException: Failed to write to '/logs/dumpfiles/to_ctf/ClientCreation18102018_23.txt.writing' while uploading the file; nested exception is org.springframework.core.NestedIOException: failed to write file; nested exception is 4: java.io.IOException: inputstream is closed, failedMessage=GenericMessage [payload=/apps/logs/lesipay-scheduler/to_ctf/ClientCreation18102018_23.txt, headers={id=e914c7a2-2b4c-74e2-92d6-b8158ed72874, timestamp=1539886500712}]
        at org.springframework.integration.file.remote.RemoteFileTemplate.doInSession(RemoteFileTemplate.java:321)
        at org.springframework.integration.file.remote.RemoteFileTemplate.doInSession(RemoteFileTemplate.java:283)
        at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:435)
        at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:283)
        at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:273)
        at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:265)
        at org.springframework.integration.file.remote.handler.FileTransferringMessageHandler.handleMessageInternal(FileTransferringMessageHandler.java:170)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
        at org.springframework.integration.config.annotation.ServiceActivatorAnnotationPostProcessor$ReplyProducingMessageHandlerWrapper.handleRequestMessage(ServiceActivatorAnnotationPostProcessor.java:98)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135)
        at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:375)
        at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477)
        at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429)
        at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420)
        at org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean.invoke(GatewayCompletableFutureProxyFactoryBean.java:65)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
        at com.sun.proxy.$Proxy72.upload(Unknown Source)
        at lk.dialog.lesipayscheduler.scheduler.LesipayFileGenScheduledTask.writeToSftpFile(LesipayFileGenScheduledTask.java:184)
        at lk.dialog.lesipayscheduler.scheduler.LesipayFileGenScheduledTask.setRepoParams(LesipayFileGenScheduledTask.java:132)
        at lk.dialog.lesipayscheduler.scheduler.LesipayFileGenScheduledTask.fileGeneratorFor21Hr(LesipayFileGenScheduledTask.java:120)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.springframework.messaging.MessagingException: Failed to write to '/logs/dumpfiles/to_ctf/ClientCreation18102018_23.txt.writing' while uploading the file; nested exception is org.springframework.core.NestedIOException: failed to write file; nested exception is 4: java.io.IOException: inputstream is closed
        at org.springframework.integration.file.remote.RemoteFileTemplate.sendFileToRemoteDirectory(RemoteFileTemplate.java:558)
        at org.springframework.integration.file.remote.RemoteFileTemplate.access0(RemoteFileTemplate.java:60)
        at org.springframework.integration.file.remote.RemoteFileTemplate.doInSession(RemoteFileTemplate.java:306)
        ... 46 more
    Caused by: org.springframework.core.NestedIOException: failed to write file; nested exception is 4: java.io.IOException: inputstream is closed
        at org.springframework.integration.sftp.session.SftpSession.write(SftpSession.java:159)
        at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.write(CachingSessionFactory.java:228)
        at org.springframework.integration.file.remote.RemoteFileTemplate.sendFileToRemoteDirectory(RemoteFileTemplate.java:530)
        ... 48 more
    Caused by: 4: java.io.IOException: inputstream is closed
        at com.jcraft.jsch.ChannelSftp._put(ChannelSftp.java:697)
        at com.jcraft.jsch.ChannelSftp.put(ChannelSftp.java:540)
        at com.jcraft.jsch.ChannelSftp.put(ChannelSftp.java:492)
        at org.springframework.integration.sftp.session.SftpSession.write(SftpSession.java:156)
        ... 50 more
    Caused by: java.io.IOException: inputstream is closed
        at com.jcraft.jsch.ChannelSftp.fill(ChannelSftp.java:2911)
        at com.jcraft.jsch.ChannelSftp.header(ChannelSftp.java:2935)
        at com.jcraft.jsch.ChannelSftp._put(ChannelSftp.java:583)
        ... 53 more

文件上传代码段

private void writeToSftpFile(List<String> stringList, String key, String specialCategory) {
    String outFileName = createSftpDirectory(key,specialCategory);
    if (stringList.size() > 0) {
        try (PrintStream ps = new PrintStream(outFileName)) {
            for (String str : stringList) {
                ps.println(str);
            }
            pathList.add(outFileName);
            logger.info("{} ", outFileName, " created.");
        } catch (IOException e) {
            logger.error("Couldn't write file -> {}", e);
        }
    } else {
        logger.info("No record for ->{}", outFileName);
    }

    if (key.equals("facilityKey")) {
        for (String localPath : pathList) {
            logger.info("localpath -> {}", localPath);
            uploadGateway.upload(new File(localPath));
        }
        pathList = new ArrayList<>();
    }

    if (specialCategory.equals("facilitySpecial")) {
        for (String localPath : pathList) {
            logger.info("localpath -> {}", localPath);
            uploadGateway.upload(new File(localPath));
        }
        pathList = new ArrayList<>();
    }

}

上传网关

@Component
@MessagingGateway
public interface UploadGateway {

    @Gateway(requestChannel = "toSftpChannel")
    void upload(File file);
}

Sftp 配置

@Configuration
public class SftpConfig {

    @Autowired
    private ConfigSource configSource;

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpFileSessionFactory() {
        DefaultSftpSessionFactory ssf = new DefaultSftpSessionFactory(true);
        ssf.setHost(configSource.getSftpHost());
        ssf.setPort(configSource.getSftpPort());
        ssf.setUser(configSource.getSftpUser());
        ssf.setPassword(configSource.getSftpPassword());
        ssf.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(ssf);
    }

    @Bean
    @ServiceActivator(inputChannel = "toSftpChannel")
    public MessageHandler handler() {
        SftpMessageHandler handler = new SftpMessageHandler(sftpFileSessionFactory());
        handler.setRemoteDirectoryExpression(new LiteralExpression(configSource.getRemoteDir()));
        handler.setFileNameGenerator(new FileNameGenerator() {
            @Override
            public String generateFileName(Message<?> message) {
                if (message.getPayload() instanceof File) {
                    return ((File) message.getPayload()).getName();
                } else {
                    throw new IllegalArgumentException("File expected as payload.");
                }
            }
        });
        return handler;
    }
}

一种方法:

import pysftp as sftp

def sftpExample():
    try:
        s = sftp.Connection('ftp.company.com', username='enterusername', password='enterpassword')

        remotepath1='/home/filepath/sample.xlsx'
        localpath1="\\corporatenetwork\datapath\sample.xlsx"

        s.get(remotepath1,localpath1, preserve_mtime=True)


        s.close()

    except Exception, e:
        print str(e)

sftpExample()

在与网络团队进行一些监控后,我找到了这个问题的根本原因。

原因是长时间打开的 ssh 会话被任意关闭。没有适当的机制来处理 ssh 会话(在使用 ssh 会话后关闭会话)。

处理会话创建和关闭后,我们能够解决问题