Spring 文件集成:使用 FileWritingMessageHandler 写入文件时出错 FileSystemException
Spring file integration: Error FileSystemException while writing File using FileWritingMessageHandler
我创建了一个流程,它读取两个文件作为输入并将它们复制到 'work' 目录,然后将它们转换为 JobRequest。有效载荷是一个 JobExecution,它包含两个文件作为参数。所以,我启动批处理,然后我想从 jobExecution 中获取两个文件,并将它们转换为两个带有 Split 的 GenericMessage。最后,我 运行 一个将这些文件写入另一个 'archive' 目录的子流程。
程序产生异常:
2022-04-25 14:56:39.974 ERROR 4556 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: failed to write Message payload to file in the [bean 'startBatchFlow.subFlow#2.file:outbound-gateway#0' for component 'startBatchFlow.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [/config/FilesReaderIntegrationConfig.class]'; from source: 'bean method startBatchFlow']; nested exception is java.nio.file.FileSystemException: C:\dev\Fichiers\test\work\Fonds.xlsx -> C:\dev\Fichiers\test\archive\Fonds.xlsx: Le processus ne peut pas accéder au fichier car ce fichier est utilisé par un autre processus, failedMessage=GenericMessage [payload=C:\dev\Fichiers\test\work\Fonds.xlsx, headers={sequenceNumber=1, file_name=Fonds.xlsx, sequenceSize=0, correlationId=14ecf13d-c99f-eff1-ae60-31400d606c2d, file_originalFile=C:\dev\Fichiers\test\work\Fonds.xlsx, id=16c09968-1833-d430-1542-ea1959364565, file_relativePath=Fonds.xlsx, timestamp=1650889605032}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.file.FileWritingMessageHandler.handleRequestMessage(FileWritingMessageHandler.java:543)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
这是我的代码:
@Bean
public IntegrationFlow startBatchFlow() throws InterruptedException {
return IntegrationFlows //
.from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(5000)))//
.handle(fileWritingMessageHandler())
.aggregate(agg -> agg.correlationExpression("payload != null")
.releaseStrategy(new ExpressionEvaluatingReleaseStrategy("size == 2"))
)
.transform(fileMessageToJobRequest()) //
.handle(jobLaunchingMessageHandler()) //
.wireTap(sf -> {
sf.handle(jobExecution ->
System.out.println("job Execution payload" + jobExecution.getPayload()));
})
.split(new MessageSplitter())
.wireTap(sf -> {
sf.handle(jobExecution ->
System.out.println("Message after split payload =" + jobExecution.getPayload()));
})
.route(Message.class, message -> message.getPayload() != null,
mapping -> mapping.subFlowMapping(true, archiveFilesFlow())
)
.get();
}
public IntegrationFlow archiveFilesFlow() {
return f -> f
.handle(fileWritingInArchive())
.handle(message -> {
System.out.println("here in archive flow" + message.getPayload());
});
}
public FileWritingMessageHandler fileWritingInArchive() {
FileWritingMessageHandler fileWritingMessageHandler = new FileWritingMessageHandler(new File(archivePath));
fileWritingMessageHandler.setDeleteSourceFiles(true);
fileWritingMessageHandler.setExpectReply(true);
return fileWritingMessageHandler;
}
和 MessageSplitter:
@Service
public class MessageSplitter extends AbstractMessageSplitter {
@Override
protected Object splitMessage(Message<?> message) {
List<File> messages = new ArrayList();
((JobExecution) message.getPayload()).getJobParameters().getParameters().forEach((key, jobParam) -> {
messages.add(new File((String) jobParam.getValue()));
});
Iterator<MessageBuilder<File>> builderIterator = new Iterator<MessageBuilder<File>>() {
private File next;
private int index = 0;
@Override
public boolean hasNext() {
if (this.next != null) { // handle multiple hasNext() calls.
return true;
}
while (index < messages.size()) {
this.next = messages.get(index);
index++;
if (index == messages.size()) {
return false;
}
return true;
}
return false;
}
@Override
public MessageBuilder<File> next() {
File message = this.next;
this.next = null;
return MessageBuilder
.withPayload(message).setHeader("file_name", message.getName())
.setHeader("file_originalFile",message)
.setHeader("file_relativePath",message.getName());
}
};
return builderIterator;
}
}
提前致谢。
您的日志中存在异常:
Le processus ne peut pas accéder au fichier car ce fichier est utilisé par un autre processus
法语翻译为:
The process cannot access the file because this file is in use by another process
所以,你的 C:\dev\Fichiers\test\work\Fonds.xlsx
在某处打开了,但没有关闭。
也许您的工作是对文件内容进行处理,但最终并未将其关闭。 Unix-type 操作系统是宽容的,但是 Windows 不允许对文件做任何事情,直到它的资源没有在其他进程中关闭。
我创建了一个流程,它读取两个文件作为输入并将它们复制到 'work' 目录,然后将它们转换为 JobRequest。有效载荷是一个 JobExecution,它包含两个文件作为参数。所以,我启动批处理,然后我想从 jobExecution 中获取两个文件,并将它们转换为两个带有 Split 的 GenericMessage。最后,我 运行 一个将这些文件写入另一个 'archive' 目录的子流程。 程序产生异常:
2022-04-25 14:56:39.974 ERROR 4556 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: failed to write Message payload to file in the [bean 'startBatchFlow.subFlow#2.file:outbound-gateway#0' for component 'startBatchFlow.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [/config/FilesReaderIntegrationConfig.class]'; from source: 'bean method startBatchFlow']; nested exception is java.nio.file.FileSystemException: C:\dev\Fichiers\test\work\Fonds.xlsx -> C:\dev\Fichiers\test\archive\Fonds.xlsx: Le processus ne peut pas accéder au fichier car ce fichier est utilisé par un autre processus, failedMessage=GenericMessage [payload=C:\dev\Fichiers\test\work\Fonds.xlsx, headers={sequenceNumber=1, file_name=Fonds.xlsx, sequenceSize=0, correlationId=14ecf13d-c99f-eff1-ae60-31400d606c2d, file_originalFile=C:\dev\Fichiers\test\work\Fonds.xlsx, id=16c09968-1833-d430-1542-ea1959364565, file_relativePath=Fonds.xlsx, timestamp=1650889605032}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.file.FileWritingMessageHandler.handleRequestMessage(FileWritingMessageHandler.java:543)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
这是我的代码:
@Bean
public IntegrationFlow startBatchFlow() throws InterruptedException {
return IntegrationFlows //
.from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(5000)))//
.handle(fileWritingMessageHandler())
.aggregate(agg -> agg.correlationExpression("payload != null")
.releaseStrategy(new ExpressionEvaluatingReleaseStrategy("size == 2"))
)
.transform(fileMessageToJobRequest()) //
.handle(jobLaunchingMessageHandler()) //
.wireTap(sf -> {
sf.handle(jobExecution ->
System.out.println("job Execution payload" + jobExecution.getPayload()));
})
.split(new MessageSplitter())
.wireTap(sf -> {
sf.handle(jobExecution ->
System.out.println("Message after split payload =" + jobExecution.getPayload()));
})
.route(Message.class, message -> message.getPayload() != null,
mapping -> mapping.subFlowMapping(true, archiveFilesFlow())
)
.get();
}
public IntegrationFlow archiveFilesFlow() {
return f -> f
.handle(fileWritingInArchive())
.handle(message -> {
System.out.println("here in archive flow" + message.getPayload());
});
}
public FileWritingMessageHandler fileWritingInArchive() {
FileWritingMessageHandler fileWritingMessageHandler = new FileWritingMessageHandler(new File(archivePath));
fileWritingMessageHandler.setDeleteSourceFiles(true);
fileWritingMessageHandler.setExpectReply(true);
return fileWritingMessageHandler;
}
和 MessageSplitter:
@Service
public class MessageSplitter extends AbstractMessageSplitter {
@Override
protected Object splitMessage(Message<?> message) {
List<File> messages = new ArrayList();
((JobExecution) message.getPayload()).getJobParameters().getParameters().forEach((key, jobParam) -> {
messages.add(new File((String) jobParam.getValue()));
});
Iterator<MessageBuilder<File>> builderIterator = new Iterator<MessageBuilder<File>>() {
private File next;
private int index = 0;
@Override
public boolean hasNext() {
if (this.next != null) { // handle multiple hasNext() calls.
return true;
}
while (index < messages.size()) {
this.next = messages.get(index);
index++;
if (index == messages.size()) {
return false;
}
return true;
}
return false;
}
@Override
public MessageBuilder<File> next() {
File message = this.next;
this.next = null;
return MessageBuilder
.withPayload(message).setHeader("file_name", message.getName())
.setHeader("file_originalFile",message)
.setHeader("file_relativePath",message.getName());
}
};
return builderIterator;
}
}
提前致谢。
您的日志中存在异常:
Le processus ne peut pas accéder au fichier car ce fichier est utilisé par un autre processus
法语翻译为:
The process cannot access the file because this file is in use by another process
所以,你的 C:\dev\Fichiers\test\work\Fonds.xlsx
在某处打开了,但没有关闭。
也许您的工作是对文件内容进行处理,但最终并未将其关闭。 Unix-type 操作系统是宽容的,但是 Windows 不允许对文件做任何事情,直到它的资源没有在其他进程中关闭。