Spring 基于文件名的集成流路由
Spring Integration Flow route based on file name
我需要解压缩、归档和处理其内容。在 zip 文件中,可以有个人或公司两种类型的文件。即可以通过文件名来区分。处理完所有文件后,它应该调用另一个程序模块,并将处理后的文件存档在不同的位置。想使用 Spring 集成。我试图通过以下代码实现此目的,但它在基于文件名进行路由时产生了问题。我正在使用 JDK 8, Spring 5
.<File, Boolean>route(new Function<File, Boolean>() {
@Override
public Boolean apply(File f) {
return f.getName().contains("individual");
}
}, m -> m
.subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
.subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
)
异常
Caused by: java.lang.IllegalArgumentException: Found ambiguous parameter type [interface java.util.function.Function] for method match: [public default <V> java.util.function.Function<V, R> java.util.function.Function.compose(java.util.function.Function<? super V, ? extends T>), public static <T> java.util.function.Function<T, T> java.util.function.Function.identity(), public java.lang.Boolean com.xxx.thirdpatysystem.config.IntegrationConfig.apply(java.io.File)]
at org.springframework.util.Assert.isNull(Assert.java:155) ~[spring-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:843) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:362) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:231) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:225) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.<init>(MethodInvokingMessageProcessor.java:60) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.router.MethodInvokingRouter.<init>(MethodInvokingRouter.java:46) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dsl.IntegrationFlowDefinition.route(IntegrationFlowDefinition.java:1922) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dsl.IntegrationFlowDefinition.route(IntegrationFlowDefinition.java:1895) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
下面连我都试过了
.<File, Boolean>route(f -> f.getName().contains("individual"), m -> m
.subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
.subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
)
异常
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.integration.dsl.IntegrationFlow]: Factory method 'thirdpatysystemFlow' threw exception; nested exception is java.lang.UnsupportedOperationException
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:579) ~[spring-beans-5.0.4.RELEASE.jar:5.0.4.RELEASE]
... 17 common frames omitted
Caused by: java.lang.UnsupportedOperationException: null
at org.springframework.integration.dsl.StandardIntegrationFlow.configure(StandardIntegrationFlow.java:89) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dsl.IntegrationFlowDefinition.gateway(IntegrationFlowDefinition.java:2172) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dsl.IntegrationFlowDefinition.gateway(IntegrationFlowDefinition.java:2151) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
完整的代码片段如下
import java.io.File;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.ChainFileListFilter;
import org.springframework.integration.file.filters.RegexPatternFileListFilter;
import org.springframework.integration.zip.splitter.UnZipResultSplitter;
import org.springframework.integration.zip.transformer.UnZipTransformer;
import org.springframework.integration.zip.transformer.ZipResultType;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* @author dpoddar
*
*/
@Configuration
@EnableIntegration
public class IntegrationConfig {
@Value("${input.directory}")
private String inputDir;
@Value("${outputDir.directory}")
private String outputDir;
@Value("${input.scan.frequency: 100}")
private long scanFrequency;
@Bean
public MessageSource<File> inputFileSource() {
FileReadingMessageSource src = new FileReadingMessageSource();
src.setDirectory(new File(inputDir));
src.setAutoCreateDirectory(true);
ChainFileListFilter<File> chainFileListFilter = new ChainFileListFilter<>();
chainFileListFilter.addFilter(new AcceptOnceFileListFilter<>() );
chainFileListFilter.addFilter(new RegexPatternFileListFilter("(?i)^.+\.zip$"));
src.setFilter(chainFileListFilter);
return src;
}
@Bean
public UnZipTransformer unZipTransformer() {
UnZipTransformer unZipTransformer = new UnZipTransformer();
unZipTransformer.setExpectSingleResult(false);
unZipTransformer.setZipResultType(ZipResultType.FILE);
//unZipTransformer.setWorkDirectory(new File("/usr/tmp/uncompress"));
unZipTransformer.setDeleteFiles(true);
return unZipTransformer;
}
@Bean
public UnZipResultSplitter splitter() {
UnZipResultSplitter splitter = new UnZipResultSplitter();
return splitter;
}
@Bean
public DirectChannel outputChannel() {
return new DirectChannel();
}
@Bean
public MessageHandler fileOutboundChannelAdapter() {
FileWritingMessageHandler adapter = new FileWritingMessageHandler(new File(outputDir));
adapter.setDeleteSourceFiles(true);
adapter.setAutoCreateDirectory(true);
adapter.setExpectReply(false);
return adapter;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
@Autowired
DirectChannel outputChannel;
@Autowired
MessageHandler fileOutboundChannelAdapter;
@Bean
public IntegrationFlow individualProcessor() {
return flow -> flow.handle("thirdpatysystemprocessor","processfile").channel(outputChannel).handle(fileOutboundChannelAdapter);
}
@Bean
public IntegrationFlow firmProcessor() {
return flow -> flow.handle("thirdpatysystemprocessor","processfile").channel(outputChannel).handle(fileOutboundChannelAdapter);
}
@Bean
public IntegrationFlow thirdpatysystemAgentDemographicFlow() {
return IntegrationFlows
.from(inputFileSource(), spec -> spec.poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)))
.transform(unZipTransformer())
.split(splitter())
.channel(MessageChannels.executor(taskExecutor()))
.<File, Boolean>route(new Function<File, Boolean>() {
@Override
public Boolean apply(File f) {
return f.getName().contains("individual");
}
}, m -> m
.subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
.subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
)
.aggregate()
/*.handle("thirdpatysystemprocessor","processfile")
.channel(outputChannel())
.handle(fileOutboundChannelAdapter())*/
.get()
;
}
}
java.lang.IllegalArgumentException: Found ambiguous parameter type [interface java.util.function.Function]
已在 Spring 集成 5.0.5
中修复:https://jira.spring.io/browse/INT-4456。所以,现在有了一个明确的 Function
impl,我们这样做:
MethodInvokingRouter methodInvokingRouter = isLambda(router)
? new MethodInvokingRouter(new LambdaMessageProcessor(router, payloadType))
: new MethodInvokingRouter(router, ClassUtils.FUNCTION_APPLY_METHOD);
我们明确指向 apply()
方法。
子流 (gateway()
) 中现有 IntegrationFlow
bean 的重用已在版本 5.0.4
中修复:https://jira.spring.io/browse/INT-4434
所以,您只需要将您的项目升级到最新的依赖项即可。特别是 Spring 集成 5.0.7
:https://spring.io/projects/spring-integration#learn
我需要解压缩、归档和处理其内容。在 zip 文件中,可以有个人或公司两种类型的文件。即可以通过文件名来区分。处理完所有文件后,它应该调用另一个程序模块,并将处理后的文件存档在不同的位置。想使用 Spring 集成。我试图通过以下代码实现此目的,但它在基于文件名进行路由时产生了问题。我正在使用 JDK 8, Spring 5
.<File, Boolean>route(new Function<File, Boolean>() {
@Override
public Boolean apply(File f) {
return f.getName().contains("individual");
}
}, m -> m
.subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
.subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
)
异常
Caused by: java.lang.IllegalArgumentException: Found ambiguous parameter type [interface java.util.function.Function] for method match: [public default <V> java.util.function.Function<V, R> java.util.function.Function.compose(java.util.function.Function<? super V, ? extends T>), public static <T> java.util.function.Function<T, T> java.util.function.Function.identity(), public java.lang.Boolean com.xxx.thirdpatysystem.config.IntegrationConfig.apply(java.io.File)]
at org.springframework.util.Assert.isNull(Assert.java:155) ~[spring-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:843) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:362) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:231) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:225) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.<init>(MethodInvokingMessageProcessor.java:60) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.router.MethodInvokingRouter.<init>(MethodInvokingRouter.java:46) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dsl.IntegrationFlowDefinition.route(IntegrationFlowDefinition.java:1922) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dsl.IntegrationFlowDefinition.route(IntegrationFlowDefinition.java:1895) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
下面连我都试过了
.<File, Boolean>route(f -> f.getName().contains("individual"), m -> m
.subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
.subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
)
异常
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.integration.dsl.IntegrationFlow]: Factory method 'thirdpatysystemFlow' threw exception; nested exception is java.lang.UnsupportedOperationException
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:579) ~[spring-beans-5.0.4.RELEASE.jar:5.0.4.RELEASE]
... 17 common frames omitted
Caused by: java.lang.UnsupportedOperationException: null
at org.springframework.integration.dsl.StandardIntegrationFlow.configure(StandardIntegrationFlow.java:89) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dsl.IntegrationFlowDefinition.gateway(IntegrationFlowDefinition.java:2172) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dsl.IntegrationFlowDefinition.gateway(IntegrationFlowDefinition.java:2151) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
完整的代码片段如下
import java.io.File;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.ChainFileListFilter;
import org.springframework.integration.file.filters.RegexPatternFileListFilter;
import org.springframework.integration.zip.splitter.UnZipResultSplitter;
import org.springframework.integration.zip.transformer.UnZipTransformer;
import org.springframework.integration.zip.transformer.ZipResultType;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* @author dpoddar
*
*/
@Configuration
@EnableIntegration
public class IntegrationConfig {
@Value("${input.directory}")
private String inputDir;
@Value("${outputDir.directory}")
private String outputDir;
@Value("${input.scan.frequency: 100}")
private long scanFrequency;
@Bean
public MessageSource<File> inputFileSource() {
FileReadingMessageSource src = new FileReadingMessageSource();
src.setDirectory(new File(inputDir));
src.setAutoCreateDirectory(true);
ChainFileListFilter<File> chainFileListFilter = new ChainFileListFilter<>();
chainFileListFilter.addFilter(new AcceptOnceFileListFilter<>() );
chainFileListFilter.addFilter(new RegexPatternFileListFilter("(?i)^.+\.zip$"));
src.setFilter(chainFileListFilter);
return src;
}
@Bean
public UnZipTransformer unZipTransformer() {
UnZipTransformer unZipTransformer = new UnZipTransformer();
unZipTransformer.setExpectSingleResult(false);
unZipTransformer.setZipResultType(ZipResultType.FILE);
//unZipTransformer.setWorkDirectory(new File("/usr/tmp/uncompress"));
unZipTransformer.setDeleteFiles(true);
return unZipTransformer;
}
@Bean
public UnZipResultSplitter splitter() {
UnZipResultSplitter splitter = new UnZipResultSplitter();
return splitter;
}
@Bean
public DirectChannel outputChannel() {
return new DirectChannel();
}
@Bean
public MessageHandler fileOutboundChannelAdapter() {
FileWritingMessageHandler adapter = new FileWritingMessageHandler(new File(outputDir));
adapter.setDeleteSourceFiles(true);
adapter.setAutoCreateDirectory(true);
adapter.setExpectReply(false);
return adapter;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
@Autowired
DirectChannel outputChannel;
@Autowired
MessageHandler fileOutboundChannelAdapter;
@Bean
public IntegrationFlow individualProcessor() {
return flow -> flow.handle("thirdpatysystemprocessor","processfile").channel(outputChannel).handle(fileOutboundChannelAdapter);
}
@Bean
public IntegrationFlow firmProcessor() {
return flow -> flow.handle("thirdpatysystemprocessor","processfile").channel(outputChannel).handle(fileOutboundChannelAdapter);
}
@Bean
public IntegrationFlow thirdpatysystemAgentDemographicFlow() {
return IntegrationFlows
.from(inputFileSource(), spec -> spec.poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)))
.transform(unZipTransformer())
.split(splitter())
.channel(MessageChannels.executor(taskExecutor()))
.<File, Boolean>route(new Function<File, Boolean>() {
@Override
public Boolean apply(File f) {
return f.getName().contains("individual");
}
}, m -> m
.subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
.subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
)
.aggregate()
/*.handle("thirdpatysystemprocessor","processfile")
.channel(outputChannel())
.handle(fileOutboundChannelAdapter())*/
.get()
;
}
}
java.lang.IllegalArgumentException: Found ambiguous parameter type [interface java.util.function.Function]
已在 Spring 集成 5.0.5
中修复:https://jira.spring.io/browse/INT-4456。所以,现在有了一个明确的 Function
impl,我们这样做:
MethodInvokingRouter methodInvokingRouter = isLambda(router)
? new MethodInvokingRouter(new LambdaMessageProcessor(router, payloadType))
: new MethodInvokingRouter(router, ClassUtils.FUNCTION_APPLY_METHOD);
我们明确指向 apply()
方法。
子流 (gateway()
) 中现有 IntegrationFlow
bean 的重用已在版本 5.0.4
中修复:https://jira.spring.io/browse/INT-4434
所以,您只需要将您的项目升级到最新的依赖项即可。特别是 Spring 集成 5.0.7
:https://spring.io/projects/spring-integration#learn