Spring 文件集成:使用 FileWritingMessageHandler 写入文件时出错 FileSystemException

Spring file integration: Error FileSystemException while writing File using FileWritingMessageHandler

我创建了一个流程,它读取两个文件作为输入并将它们复制到 'work' 目录,然后将它们转换为 JobRequest。有效载荷是一个 JobExecution,它包含两个文件作为参数。所以,我启动批处理,然后我想从 jobExecution 中获取两个文件,并将它们转换为两个带有 Split 的 GenericMessage。最后,我 运行 一个将这些文件写入另一个 'archive' 目录的子流程。 程序产生异常:

2022-04-25 14:56:39.974 ERROR 4556 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: failed to write Message payload to file in the [bean 'startBatchFlow.subFlow#2.file:outbound-gateway#0' for component 'startBatchFlow.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [/config/FilesReaderIntegrationConfig.class]'; from source: 'bean method startBatchFlow']; nested exception is java.nio.file.FileSystemException: C:\dev\Fichiers\test\work\Fonds.xlsx -> C:\dev\Fichiers\test\archive\Fonds.xlsx: Le processus ne peut pas accéder au fichier car ce fichier est utilisé par un autre processus, failedMessage=GenericMessage [payload=C:\dev\Fichiers\test\work\Fonds.xlsx, headers={sequenceNumber=1, file_name=Fonds.xlsx, sequenceSize=0, correlationId=14ecf13d-c99f-eff1-ae60-31400d606c2d, file_originalFile=C:\dev\Fichiers\test\work\Fonds.xlsx, id=16c09968-1833-d430-1542-ea1959364565, file_relativePath=Fonds.xlsx, timestamp=1650889605032}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.file.FileWritingMessageHandler.handleRequestMessage(FileWritingMessageHandler.java:543)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)

这是我的代码:

    @Bean
    public IntegrationFlow startBatchFlow() throws InterruptedException {
        return IntegrationFlows //
                .from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(5000)))//
                .handle(fileWritingMessageHandler())
                .aggregate(agg -> agg.correlationExpression("payload != null")
                        .releaseStrategy(new ExpressionEvaluatingReleaseStrategy("size == 2"))
                )
                .transform(fileMessageToJobRequest()) //
                .handle(jobLaunchingMessageHandler()) //
                .wireTap(sf -> {
                    sf.handle(jobExecution ->
                            System.out.println("job Execution payload" + jobExecution.getPayload()));
                })
                .split(new MessageSplitter())
                .wireTap(sf -> {
                    sf.handle(jobExecution ->
                            System.out.println("Message after split payload =" + jobExecution.getPayload()));
                })
                .route(Message.class, message -> message.getPayload() != null,
                        mapping -> mapping.subFlowMapping(true, archiveFilesFlow())
                )
                .get();
    }

    public IntegrationFlow archiveFilesFlow() {
        return f -> f
                .handle(fileWritingInArchive())
                .handle(message -> {
                    System.out.println("here in archive flow" + message.getPayload());
                });
    }

    public FileWritingMessageHandler fileWritingInArchive() {
        FileWritingMessageHandler fileWritingMessageHandler = new FileWritingMessageHandler(new File(archivePath));
        fileWritingMessageHandler.setDeleteSourceFiles(true);
        fileWritingMessageHandler.setExpectReply(true);
        return fileWritingMessageHandler;
    }

和 MessageSplitter:

@Service
public class MessageSplitter extends AbstractMessageSplitter {
    
    @Override
    protected Object splitMessage(Message<?> message) {
        List<File> messages = new ArrayList();
        ((JobExecution) message.getPayload()).getJobParameters().getParameters().forEach((key, jobParam) -> {
            messages.add(new File((String) jobParam.getValue()));
        });
        Iterator<MessageBuilder<File>> builderIterator = new Iterator<MessageBuilder<File>>() {
            private File next;
            private int index = 0;

            @Override
            public boolean hasNext() {
                if (this.next != null) { // handle multiple hasNext() calls.
                    return true;
                }
                while (index < messages.size()) {
                    this.next = messages.get(index);
                    index++;
                    if (index == messages.size()) {
                        return false;
                    }
                    return true;
                }
                return false;
            }

            @Override
            public MessageBuilder<File> next() {
                File message = this.next;
                this.next = null;
                return MessageBuilder
                        .withPayload(message).setHeader("file_name", message.getName())
                        .setHeader("file_originalFile",message)
                        .setHeader("file_relativePath",message.getName());

            }
        };
        return builderIterator;
    }
}

提前致谢。

您的日志中存在异常:

Le processus ne peut pas accéder au fichier car ce fichier est utilisé par un autre processus

法语翻译为:

The process cannot access the file because this file is in use by another process

所以,你的 C:\dev\Fichiers\test\work\Fonds.xlsx 在某处打开了,但没有关闭。

也许您的工作是对文件内容进行处理,但最终并未将其关闭。 Unix-type 操作系统是宽容的,但是 Windows 不允许对文件做任何事情,直到它的资源没有在其他进程中关闭。