Spring 批处理:配置服务激活器
Spring Batch: Configuring service-activator
我正在尝试在我的 Spring 批处理中添加一个服务激活器。XML 配置文件在我的转换器方法中设置 requires-reply=false:FileMessageToJobRequestTransformer。
这是 XML:
<int:transformer id="fileMessageToJobRequestTransformer"
input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel"
method="transform" >
<bean class="com.distributedfinance.mbi.bai.transformer.FileMessageToJobRequestTransformer">
<property name="job" ref="baiParseJob"/>
<property name="fileParameterName" value="input.file.url"/>
</bean>
<int:poller fixed-rate="1000"/>
</int:transformer>
<int:service-activator requires-reply="false" input-channel="inboundFileChannel" output-channel="outboundJobRequestChannel" ref="fileMessageToJobRequestTransformer"> </int:service-activator>
这是堆栈跟踪:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3': Cannot resolve reference to bean 'org.springframework.integration.config.ServiceActivatorFactoryBean#1' while setting bean property 'handler'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ServiceActivatorFactoryBean#1': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Long] for method match: [public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTrigger(org.springframework.scheduling.Trigger), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setErrorHandler(org.springframework.util.ErrorHandler), public void org.springframework.integration.context.IntegrationObjectSupport.setComponentName(java.lang.String), public void org.springframework.integration.context.IntegrationObjectSupport.setApplicationContext(org.springframework.context.ApplicationContext) throws org.springframework.beans.BeansException, public void org.springframework.integration.endpoint.AbstractEndpoint.setAutoStartup(boolean), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setMaxMessagesPerPoll(long), public void org.springframework.integration.endpoint.AbstractEndpoint.setTaskScheduler(org.springframework.scheduling.TaskScheduler), public void org.springframework.integration.endpoint.AbstractEndpoint.setPhase(int), public void org.springframework.integration.context.IntegrationObjectSupport.getComponentType(), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTransactionSynchronizationFactory(org.springframework.integration.transaction.TransactionSynchronizationFactory), public final void org.springframework.integration.context.IntegrationObjectSupport.setBeanFactory(org.springframework.beans.factory.BeanFactory)]
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:175)
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:103)
at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1584)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:253)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:196)
at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:351)
... 23 more
Caused by: java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Long] for method match: [public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTrigger(org.springframework.scheduling.Trigger), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setErrorHandler(org.springframework.util.ErrorHandler), public void org.springframework.integration.context.IntegrationObjectSupport.setComponentName(java.lang.String), public void org.springframework.integration.context.IntegrationObjectSupport.setApplicationContext(org.springframework.context.ApplicationContext) throws org.springframework.beans.BeansException, public void org.springframework.integration.endpoint.AbstractEndpoint.setAutoStartup(boolean), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setMaxMessagesPerPoll(long), public void org.springframework.integration.endpoint.AbstractEndpoint.setTaskScheduler(org.springframework.scheduling.TaskScheduler), public void org.springframework.integration.endpoint.AbstractEndpoint.setPhase(int), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setBeanClassLoader(java.lang.ClassLoader), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setAdviceChain(java.util.List), public void org.springframework.integration.context.IntegrationObjectSupport.setChannelResolver(org.springframework.messaging.core.DestinationResolver), public final void org.springframework.integration.endpoint.AbstractEndpoint.stop(java.lang.Runnable), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTaskExecutor(java.util.concurrent.Executor), public void org.springframework.integration.context.IntegrationObjectSupport.setMessageBuilderFactory(org.springframework.integration.support.MessageBuilderFactory), public java.lang.String org.springframework.integration.context.IntegrationObjectSupport.getComponentType(), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTransactionSynchronizationFactory(org.springframework.integration.transaction.TransactionSynchronizationFactory), public final void org.springframework.integration.context.IntegrationObjectSupport.setBeanFactory(org.springframework.beans.factory.BeanFactory)]
at org.springframework.util.Assert.isNull(Assert.java:92)
at org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:446)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:192)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:136)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:131)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.<init>(MethodInvokingMessageProcessor.java:57)
at org.springframework.integration.handler.ServiceActivatingHandler.<init>(ServiceActivatingHandler.java:37)
at org.springframework.integration.config.ServiceActivatorFactoryBean.createMethodInvokingHandler(ServiceActivatorFactoryBean.java:57)
at org.springframework.integration.config.AbstractStandardMessageHandlerFactoryBean.createHandler(AbstractStandardMessageHandlerFactoryBean.java:97)
at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.createHandlerInternal(AbstractSimpleMessageHandlerFactoryBean.java:116)
at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:104)
at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:46)
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:168)
... 28 more
Process finished with exit code 1
更新:
@Transformer
public JobLaunchRequest transform(Message<File> message) {
LOGGER.debug("File message: {}", message);
String jobName = message.getPayload().getAbsolutePath().toString();
if (jobName.contains(".gitignore"))
return null;
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
"file://" + message.getPayload().getAbsolutePath());
LOGGER.debug("Job params: {}", jobParametersBuilder.toJobParameters());
Integer pathDelimiterOffset = jobName.lastIndexOf('/');
if (pathDelimiterOffset != -1) {
String fileName = jobName.substring(pathDelimiterOffset+1);
String archiveDirectoryName = "bai/archive/";
String archivePathName = archiveDirectoryName.concat(fileName);
File f = new File(archivePathName);
// If the file exists in the bai/archive directory - assume it has already been processed - SKIP IT! This is not a as reliable as querying Spring Batch tablle batch_execution_params... which will be done next.
if (f.exists() && !f.isDirectory())
return null;
}
try {
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
catch (Exception e) {
if (e instanceof JobInstanceAlreadyCompleteException) {
LOGGER.debug("Exception Handler: JobInstanceAlreadyCompleteException - Job params: {}", jobParametersBuilder.toJobParameters());
}
}
return null;
}
SftpBaiParserJobBridge-context.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/batch-integration
http://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">
<!-- The bridge between SFTP spring integration to BAI parse spring batch job -->
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel">
<int:queue/>
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<!-- When getting a new BAI file, transform into spring batch job request -->
<int:transformer id="fileMessageToJobRequestTransformer"
input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel"
method="transform" >
<bean class="com.distributedfinance.mbi.bai.transformer.FileMessageToJobRequestTransformer">
<property name="job" ref="baiParseJob"/>
<property name="fileParameterName" value="input.file.url"/>
</bean>
<int:poller fixed-rate="60000" max-messages-per-poll="100"/>
</int:transformer>
<!-- Gateway to launch the BAI parse spring batch job -->
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel">
</batch-int:job-launching-gateway>
<int:logging-channel-adapter id="logger" level="WARN"
logger-name="com.distributedfinance.mbi.bai.job"/>
</beans>
SftpInboundReceive-context.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/sftp
http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd">
<!-- Create an inbound channel adapter to receive the BAI files via SFTP -->
<import resource="SftpSession-context.xml"/>
<int:channel id="inboundFileChannel">
<int:queue/>
</int:channel>
<int-sftp:inbound-channel-adapter id="sftpInboundAdapter"
auto-startup="true"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="${bai.sftp.local-dir}"
remote-directory="${bai.sftp.remote-dir}"
auto-create-local-directory="true"
preserve-timestamp="true"
delete-remote-files="${bai.sftp.delete}">
<int:poller cron="${bai.sftp.cron}" max-messages-per-poll="10"/>
</int-sftp:inbound-channel-adapter>
</beans>
BaiParserJob.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/spring-util.xsd">
<bean name="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer"/>
<!-- BATCH-2351 workaround -->
<bean id="stepScope" class="org.springframework.batch.core.scope.StepScope">
<property name="autoProxy" value="true"/>
</bean>
<batch:job id="baiParseJob" incrementer="jobParametersIncrementer">
<batch:step id="baiParseStep" next="baiArchive">
<batch:tasklet transaction-manager="transactionManager">
<batch:chunk reader="baiItemReader"
processor="baiItemProcessor"
writer="baiItemWriter"
commit-interval="1"/>
</batch:tasklet>
</batch:step>
<batch:step id="baiArchive">
<batch:tasklet ref="fileArchivingTasklet"/>
</batch:step>
</batch:job>
<bean id="baiItemReader" class="com.distributedfinance.mbi.bai.reader.MultiLineBaiItemReader"
scope="step">
<property name="delegate" ref="flatFileItemReader"/>
<property name="baiFileFieldSetMapper">
<bean class="com.distributedfinance.mbi.bai.mapper.BaiFileFieldSetMapper"/>
</property>
<property name="baiGroupFieldSetMapper">
<bean class="com.distributedfinance.mbi.bai.mapper.BaiGroupFieldSetMapper"/>
</property>
<property name="baiAccountFieldSetMapper">
<bean class="com.distributedfinance.mbi.bai.mapper.BaiAccountFieldSetMapper">
<property name="parser">
<bean class="com.distributedfinance.mbi.bai.mapper.BaiTypeParser"/>
</property>
</bean>
</property>
<property name="baiTransactionFieldSetMapper">
<bean class="com.distributedfinance.mbi.bai.mapper.BaiTransactionFieldSetMapper"/>
</property>
</bean>
<bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" value="#{jobParameters['input.file.url']}"/>
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"/>
</property>
<property name="fieldSetMapper">
<bean class="org.springframework.batch.item.file.mapping.PassThroughFieldSetMapper"/>
</property>
</bean>
</property>
</bean>
<bean id="baiItemProcessor" class="com.distributedfinance.mbi.bai.processor.BaiItemProcessor">
<constructor-arg index="0" ref="accountLookup"/>
<constructor-arg index="1">
<bean class="java.text.SimpleDateFormat">
<constructor-arg value="yyMMddHHmmss"/>
</bean>
</constructor-arg>
</bean>
<bean id="baiItemWriter" class="com.distributedfinance.mbi.bai.writer.BaiItemWriter"/>
<bean id="accountLookup" class="com.distributedfinance.mbi.bai.lookup.AccountLookup"/>
<bean id="fileArchivingTasklet" class="com.distributedfinance.mbi.bai.tasklet.FileArchivingTasklet">
<property name="downloadFileKey" value="input.file.url"/>
<property name="archiveDirectory" value="${bai.sftp.archive-dir}"/>
<property name="purgeDays" value="${bai.sftp.purge-days}"/>
</bean>
</beans>
您不能从 service-activator
引用 fileMessageToJobRequestTransformer
bean;框架不知道调用哪个方法。实际的转换器 bean 包装在消费者 bean 中。
看起来您根本不需要服务激活器 - 您已经将转换器订阅到该频道 input-channel="inboundFileChannel"
。
根据您的配置,您在该通道上有两个消费者 - 转换器和服务激活器。
编辑
变形金刚总是希望return回复;因为您的 class 可能会或可能不会 return 回复,所以它不是真正的转换器(严格意义上),它是一种服务。
简单使用
<int:service-activator id="fileMessageToJobRequestTransformer"
input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel"
requires-reply="false"
method="transform" >
<bean class="com.distributedfinance.mbi.bai.transformer.FileMessageToJobRequestTransformer">
<property name="job" ref="baiParseJob"/>
<property name="fileParameterName" value="input.file.url"/>
</bean>
<int:poller fixed-rate="1000"/>
</int:service-activator>
我正在尝试在我的 Spring 批处理中添加一个服务激活器。XML 配置文件在我的转换器方法中设置 requires-reply=false:FileMessageToJobRequestTransformer。
这是 XML:
<int:transformer id="fileMessageToJobRequestTransformer"
input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel"
method="transform" >
<bean class="com.distributedfinance.mbi.bai.transformer.FileMessageToJobRequestTransformer">
<property name="job" ref="baiParseJob"/>
<property name="fileParameterName" value="input.file.url"/>
</bean>
<int:poller fixed-rate="1000"/>
</int:transformer>
<int:service-activator requires-reply="false" input-channel="inboundFileChannel" output-channel="outboundJobRequestChannel" ref="fileMessageToJobRequestTransformer"> </int:service-activator>
这是堆栈跟踪:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3': Cannot resolve reference to bean 'org.springframework.integration.config.ServiceActivatorFactoryBean#1' while setting bean property 'handler'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ServiceActivatorFactoryBean#1': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Long] for method match: [public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTrigger(org.springframework.scheduling.Trigger), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setErrorHandler(org.springframework.util.ErrorHandler), public void org.springframework.integration.context.IntegrationObjectSupport.setComponentName(java.lang.String), public void org.springframework.integration.context.IntegrationObjectSupport.setApplicationContext(org.springframework.context.ApplicationContext) throws org.springframework.beans.BeansException, public void org.springframework.integration.endpoint.AbstractEndpoint.setAutoStartup(boolean), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setMaxMessagesPerPoll(long), public void org.springframework.integration.endpoint.AbstractEndpoint.setTaskScheduler(org.springframework.scheduling.TaskScheduler), public void org.springframework.integration.endpoint.AbstractEndpoint.setPhase(int), public void org.springframework.integration.context.IntegrationObjectSupport.getComponentType(), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTransactionSynchronizationFactory(org.springframework.integration.transaction.TransactionSynchronizationFactory), public final void org.springframework.integration.context.IntegrationObjectSupport.setBeanFactory(org.springframework.beans.factory.BeanFactory)]
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:175)
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:103)
at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1584)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:253)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:196)
at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:351)
... 23 more
Caused by: java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Long] for method match: [public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTrigger(org.springframework.scheduling.Trigger), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setErrorHandler(org.springframework.util.ErrorHandler), public void org.springframework.integration.context.IntegrationObjectSupport.setComponentName(java.lang.String), public void org.springframework.integration.context.IntegrationObjectSupport.setApplicationContext(org.springframework.context.ApplicationContext) throws org.springframework.beans.BeansException, public void org.springframework.integration.endpoint.AbstractEndpoint.setAutoStartup(boolean), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setMaxMessagesPerPoll(long), public void org.springframework.integration.endpoint.AbstractEndpoint.setTaskScheduler(org.springframework.scheduling.TaskScheduler), public void org.springframework.integration.endpoint.AbstractEndpoint.setPhase(int), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setBeanClassLoader(java.lang.ClassLoader), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setAdviceChain(java.util.List), public void org.springframework.integration.context.IntegrationObjectSupport.setChannelResolver(org.springframework.messaging.core.DestinationResolver), public final void org.springframework.integration.endpoint.AbstractEndpoint.stop(java.lang.Runnable), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTaskExecutor(java.util.concurrent.Executor), public void org.springframework.integration.context.IntegrationObjectSupport.setMessageBuilderFactory(org.springframework.integration.support.MessageBuilderFactory), public java.lang.String org.springframework.integration.context.IntegrationObjectSupport.getComponentType(), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTransactionSynchronizationFactory(org.springframework.integration.transaction.TransactionSynchronizationFactory), public final void org.springframework.integration.context.IntegrationObjectSupport.setBeanFactory(org.springframework.beans.factory.BeanFactory)]
at org.springframework.util.Assert.isNull(Assert.java:92)
at org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:446)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:192)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:136)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:131)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.<init>(MethodInvokingMessageProcessor.java:57)
at org.springframework.integration.handler.ServiceActivatingHandler.<init>(ServiceActivatingHandler.java:37)
at org.springframework.integration.config.ServiceActivatorFactoryBean.createMethodInvokingHandler(ServiceActivatorFactoryBean.java:57)
at org.springframework.integration.config.AbstractStandardMessageHandlerFactoryBean.createHandler(AbstractStandardMessageHandlerFactoryBean.java:97)
at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.createHandlerInternal(AbstractSimpleMessageHandlerFactoryBean.java:116)
at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:104)
at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:46)
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:168)
... 28 more
Process finished with exit code 1
更新:
@Transformer
public JobLaunchRequest transform(Message<File> message) {
LOGGER.debug("File message: {}", message);
String jobName = message.getPayload().getAbsolutePath().toString();
if (jobName.contains(".gitignore"))
return null;
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
"file://" + message.getPayload().getAbsolutePath());
LOGGER.debug("Job params: {}", jobParametersBuilder.toJobParameters());
Integer pathDelimiterOffset = jobName.lastIndexOf('/');
if (pathDelimiterOffset != -1) {
String fileName = jobName.substring(pathDelimiterOffset+1);
String archiveDirectoryName = "bai/archive/";
String archivePathName = archiveDirectoryName.concat(fileName);
File f = new File(archivePathName);
// If the file exists in the bai/archive directory - assume it has already been processed - SKIP IT! This is not a as reliable as querying Spring Batch tablle batch_execution_params... which will be done next.
if (f.exists() && !f.isDirectory())
return null;
}
try {
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
catch (Exception e) {
if (e instanceof JobInstanceAlreadyCompleteException) {
LOGGER.debug("Exception Handler: JobInstanceAlreadyCompleteException - Job params: {}", jobParametersBuilder.toJobParameters());
}
}
return null;
}
SftpBaiParserJobBridge-context.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/batch-integration
http://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">
<!-- The bridge between SFTP spring integration to BAI parse spring batch job -->
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel">
<int:queue/>
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<!-- When getting a new BAI file, transform into spring batch job request -->
<int:transformer id="fileMessageToJobRequestTransformer"
input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel"
method="transform" >
<bean class="com.distributedfinance.mbi.bai.transformer.FileMessageToJobRequestTransformer">
<property name="job" ref="baiParseJob"/>
<property name="fileParameterName" value="input.file.url"/>
</bean>
<int:poller fixed-rate="60000" max-messages-per-poll="100"/>
</int:transformer>
<!-- Gateway to launch the BAI parse spring batch job -->
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel">
</batch-int:job-launching-gateway>
<int:logging-channel-adapter id="logger" level="WARN"
logger-name="com.distributedfinance.mbi.bai.job"/>
</beans>
SftpInboundReceive-context.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/sftp
http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd">
<!-- Create an inbound channel adapter to receive the BAI files via SFTP -->
<import resource="SftpSession-context.xml"/>
<int:channel id="inboundFileChannel">
<int:queue/>
</int:channel>
<int-sftp:inbound-channel-adapter id="sftpInboundAdapter"
auto-startup="true"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="${bai.sftp.local-dir}"
remote-directory="${bai.sftp.remote-dir}"
auto-create-local-directory="true"
preserve-timestamp="true"
delete-remote-files="${bai.sftp.delete}">
<int:poller cron="${bai.sftp.cron}" max-messages-per-poll="10"/>
</int-sftp:inbound-channel-adapter>
</beans>
BaiParserJob.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/spring-util.xsd">
<bean name="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer"/>
<!-- BATCH-2351 workaround -->
<bean id="stepScope" class="org.springframework.batch.core.scope.StepScope">
<property name="autoProxy" value="true"/>
</bean>
<batch:job id="baiParseJob" incrementer="jobParametersIncrementer">
<batch:step id="baiParseStep" next="baiArchive">
<batch:tasklet transaction-manager="transactionManager">
<batch:chunk reader="baiItemReader"
processor="baiItemProcessor"
writer="baiItemWriter"
commit-interval="1"/>
</batch:tasklet>
</batch:step>
<batch:step id="baiArchive">
<batch:tasklet ref="fileArchivingTasklet"/>
</batch:step>
</batch:job>
<bean id="baiItemReader" class="com.distributedfinance.mbi.bai.reader.MultiLineBaiItemReader"
scope="step">
<property name="delegate" ref="flatFileItemReader"/>
<property name="baiFileFieldSetMapper">
<bean class="com.distributedfinance.mbi.bai.mapper.BaiFileFieldSetMapper"/>
</property>
<property name="baiGroupFieldSetMapper">
<bean class="com.distributedfinance.mbi.bai.mapper.BaiGroupFieldSetMapper"/>
</property>
<property name="baiAccountFieldSetMapper">
<bean class="com.distributedfinance.mbi.bai.mapper.BaiAccountFieldSetMapper">
<property name="parser">
<bean class="com.distributedfinance.mbi.bai.mapper.BaiTypeParser"/>
</property>
</bean>
</property>
<property name="baiTransactionFieldSetMapper">
<bean class="com.distributedfinance.mbi.bai.mapper.BaiTransactionFieldSetMapper"/>
</property>
</bean>
<bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" value="#{jobParameters['input.file.url']}"/>
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"/>
</property>
<property name="fieldSetMapper">
<bean class="org.springframework.batch.item.file.mapping.PassThroughFieldSetMapper"/>
</property>
</bean>
</property>
</bean>
<bean id="baiItemProcessor" class="com.distributedfinance.mbi.bai.processor.BaiItemProcessor">
<constructor-arg index="0" ref="accountLookup"/>
<constructor-arg index="1">
<bean class="java.text.SimpleDateFormat">
<constructor-arg value="yyMMddHHmmss"/>
</bean>
</constructor-arg>
</bean>
<bean id="baiItemWriter" class="com.distributedfinance.mbi.bai.writer.BaiItemWriter"/>
<bean id="accountLookup" class="com.distributedfinance.mbi.bai.lookup.AccountLookup"/>
<bean id="fileArchivingTasklet" class="com.distributedfinance.mbi.bai.tasklet.FileArchivingTasklet">
<property name="downloadFileKey" value="input.file.url"/>
<property name="archiveDirectory" value="${bai.sftp.archive-dir}"/>
<property name="purgeDays" value="${bai.sftp.purge-days}"/>
</bean>
</beans>
您不能从 service-activator
引用 fileMessageToJobRequestTransformer
bean;框架不知道调用哪个方法。实际的转换器 bean 包装在消费者 bean 中。
看起来您根本不需要服务激活器 - 您已经将转换器订阅到该频道 input-channel="inboundFileChannel"
。
根据您的配置,您在该通道上有两个消费者 - 转换器和服务激活器。
编辑
变形金刚总是希望return回复;因为您的 class 可能会或可能不会 return 回复,所以它不是真正的转换器(严格意义上),它是一种服务。
简单使用
<int:service-activator id="fileMessageToJobRequestTransformer"
input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel"
requires-reply="false"
method="transform" >
<bean class="com.distributedfinance.mbi.bai.transformer.FileMessageToJobRequestTransformer">
<property name="job" ref="baiParseJob"/>
<property name="fileParameterName" value="input.file.url"/>
</bean>
<int:poller fixed-rate="1000"/>
</int:service-activator>