Spring 批处理:配置服务激活器

Spring Batch: Configuring service-activator

我正在尝试在我的 Spring 批处理中添加一个服务激活器。XML 配置文件在我的转换器方法中设置 requires-reply=false:FileMessageToJobRequestTransformer。

这是 XML:

<int:transformer id="fileMessageToJobRequestTransformer"
                     input-channel="inboundFileChannel"
                     output-channel="outboundJobRequestChannel"
                     method="transform" >
        <bean class="com.distributedfinance.mbi.bai.transformer.FileMessageToJobRequestTransformer">
            <property name="job" ref="baiParseJob"/>
            <property name="fileParameterName" value="input.file.url"/>
        </bean>
        <int:poller fixed-rate="1000"/>
    </int:transformer>

    <int:service-activator requires-reply="false" input-channel="inboundFileChannel" output-channel="outboundJobRequestChannel" ref="fileMessageToJobRequestTransformer"> </int:service-activator>

这是堆栈跟踪:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3': Cannot resolve reference to bean 'org.springframework.integration.config.ServiceActivatorFactoryBean#1' while setting bean property 'handler'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ServiceActivatorFactoryBean#1': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Long] for method match: [public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTrigger(org.springframework.scheduling.Trigger), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setErrorHandler(org.springframework.util.ErrorHandler), public void org.springframework.integration.context.IntegrationObjectSupport.setComponentName(java.lang.String), public void org.springframework.integration.context.IntegrationObjectSupport.setApplicationContext(org.springframework.context.ApplicationContext) throws org.springframework.beans.BeansException, public void org.springframework.integration.endpoint.AbstractEndpoint.setAutoStartup(boolean), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setMaxMessagesPerPoll(long), public void org.springframework.integration.endpoint.AbstractEndpoint.setTaskScheduler(org.springframework.scheduling.TaskScheduler), public void org.springframework.integration.endpoint.AbstractEndpoint.setPhase(int), public void org.springframework.integration.context.IntegrationObjectSupport.getComponentType(), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTransactionSynchronizationFactory(org.springframework.integration.transaction.TransactionSynchronizationFactory), public final void org.springframework.integration.context.IntegrationObjectSupport.setBeanFactory(org.springframework.beans.factory.BeanFactory)]
        at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:175)
        at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:103)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1584)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:253)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:196)
        at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:351)
        ... 23 more
    Caused by: java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Long] for method match: [public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTrigger(org.springframework.scheduling.Trigger), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setErrorHandler(org.springframework.util.ErrorHandler), public void org.springframework.integration.context.IntegrationObjectSupport.setComponentName(java.lang.String), public void org.springframework.integration.context.IntegrationObjectSupport.setApplicationContext(org.springframework.context.ApplicationContext) throws org.springframework.beans.BeansException, public void org.springframework.integration.endpoint.AbstractEndpoint.setAutoStartup(boolean), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setMaxMessagesPerPoll(long), public void org.springframework.integration.endpoint.AbstractEndpoint.setTaskScheduler(org.springframework.scheduling.TaskScheduler), public void org.springframework.integration.endpoint.AbstractEndpoint.setPhase(int), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setBeanClassLoader(java.lang.ClassLoader), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setAdviceChain(java.util.List), public void org.springframework.integration.context.IntegrationObjectSupport.setChannelResolver(org.springframework.messaging.core.DestinationResolver), public final void org.springframework.integration.endpoint.AbstractEndpoint.stop(java.lang.Runnable), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTaskExecutor(java.util.concurrent.Executor), public void org.springframework.integration.context.IntegrationObjectSupport.setMessageBuilderFactory(org.springframework.integration.support.MessageBuilderFactory), public java.lang.String org.springframework.integration.context.IntegrationObjectSupport.getComponentType(), public void org.springframework.integration.endpoint.AbstractPollingEndpoint.setTransactionSynchronizationFactory(org.springframework.integration.transaction.TransactionSynchronizationFactory), public final void org.springframework.integration.context.IntegrationObjectSupport.setBeanFactory(org.springframework.beans.factory.BeanFactory)]
        at org.springframework.util.Assert.isNull(Assert.java:92)
        at org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:446)
        at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:192)
        at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:136)
        at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:131)
        at org.springframework.integration.handler.MethodInvokingMessageProcessor.<init>(MethodInvokingMessageProcessor.java:57)
        at org.springframework.integration.handler.ServiceActivatingHandler.<init>(ServiceActivatingHandler.java:37)
        at org.springframework.integration.config.ServiceActivatorFactoryBean.createMethodInvokingHandler(ServiceActivatorFactoryBean.java:57)
        at org.springframework.integration.config.AbstractStandardMessageHandlerFactoryBean.createHandler(AbstractStandardMessageHandlerFactoryBean.java:97)
        at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.createHandlerInternal(AbstractSimpleMessageHandlerFactoryBean.java:116)
        at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:104)
        at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:46)
        at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:168)
        ... 28 more

    Process finished with exit code 1

更新:

@Transformer
    public JobLaunchRequest transform(Message<File> message) {

        LOGGER.debug("File message: {}", message);
        String jobName = message.getPayload().getAbsolutePath().toString();

        if (jobName.contains(".gitignore"))
                return null;

        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
                "file://" + message.getPayload().getAbsolutePath());


        LOGGER.debug("Job params: {}", jobParametersBuilder.toJobParameters());

        Integer  pathDelimiterOffset = jobName.lastIndexOf('/');
        if (pathDelimiterOffset != -1) {
            String fileName = jobName.substring(pathDelimiterOffset+1);
            String archiveDirectoryName =  "bai/archive/";
            String archivePathName =  archiveDirectoryName.concat(fileName);
            File f = new File(archivePathName);

            // If the file exists in the bai/archive directory - assume it has already been processed - SKIP IT!  This is not a as reliable as querying Spring Batch tablle batch_execution_params... which will be done next.
            if (f.exists() && !f.isDirectory())
                return null;
        }

        try {
            return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
        }
        catch (Exception e) {
            if (e instanceof JobInstanceAlreadyCompleteException) {
                LOGGER.debug("Exception Handler: JobInstanceAlreadyCompleteException - Job params: {}", jobParametersBuilder.toJobParameters());
            }
        }
        return null;
    }

SftpBaiParserJobBridge-context.xml:

  <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:int="http://www.springframework.org/schema/integration"
           xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
           xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
                http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/batch-integration
                http://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">

        <!-- The bridge between SFTP spring integration to BAI parse spring batch job -->

        <int:channel id="outboundJobRequestChannel"/>
        <int:channel id="jobLaunchReplyChannel">
            <int:queue/>
            <int:interceptors>
                <int:wire-tap channel="logger"/>
            </int:interceptors>
        </int:channel>

        <!-- When getting a new BAI file, transform into spring batch job request -->
        <int:transformer id="fileMessageToJobRequestTransformer"
                         input-channel="inboundFileChannel"
                         output-channel="outboundJobRequestChannel"
                               method="transform" >
            <bean class="com.distributedfinance.mbi.bai.transformer.FileMessageToJobRequestTransformer">
                <property name="job" ref="baiParseJob"/>
                <property name="fileParameterName" value="input.file.url"/>
            </bean>
            <int:poller fixed-rate="60000" max-messages-per-poll="100"/>
        </int:transformer>

        <!-- Gateway to launch the BAI parse spring batch job -->
        <batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
                                         reply-channel="jobLaunchReplyChannel">
        </batch-int:job-launching-gateway>

        <int:logging-channel-adapter id="logger" level="WARN"
                                     logger-name="com.distributedfinance.mbi.bai.job"/>
    </beans>

SftpInboundReceive-context.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/sftp
            http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd">

    <!-- Create an inbound channel adapter to receive the BAI files via SFTP -->

    <import resource="SftpSession-context.xml"/>

    <int:channel id="inboundFileChannel">
        <int:queue/>
    </int:channel>

    <int-sftp:inbound-channel-adapter id="sftpInboundAdapter"
            auto-startup="true"
            channel="inboundFileChannel"
            session-factory="sftpSessionFactory"
            local-directory="${bai.sftp.local-dir}"
            remote-directory="${bai.sftp.remote-dir}"
            auto-create-local-directory="true"
            preserve-timestamp="true"
            delete-remote-files="${bai.sftp.delete}">
        <int:poller cron="${bai.sftp.cron}" max-messages-per-poll="10"/>
    </int-sftp:inbound-channel-adapter>


</beans>

BaiParserJob.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/batch
            http://www.springframework.org/schema/batch/spring-batch.xsd
        http://www.springframework.org/schema/util
            http://www.springframework.org/schema/spring-util.xsd">

    <bean name="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer"/>

    <!-- BATCH-2351 workaround -->
    <bean id="stepScope" class="org.springframework.batch.core.scope.StepScope">
        <property name="autoProxy" value="true"/>
    </bean>

    <batch:job id="baiParseJob" incrementer="jobParametersIncrementer">
        <batch:step id="baiParseStep" next="baiArchive">
            <batch:tasklet transaction-manager="transactionManager">
                <batch:chunk reader="baiItemReader"
                             processor="baiItemProcessor"
                             writer="baiItemWriter"
                             commit-interval="1"/>
            </batch:tasklet>
        </batch:step>
        <batch:step id="baiArchive">
            <batch:tasklet ref="fileArchivingTasklet"/>
        </batch:step>
    </batch:job>

    <bean id="baiItemReader" class="com.distributedfinance.mbi.bai.reader.MultiLineBaiItemReader"
            scope="step">
        <property name="delegate" ref="flatFileItemReader"/>
        <property name="baiFileFieldSetMapper">
            <bean class="com.distributedfinance.mbi.bai.mapper.BaiFileFieldSetMapper"/>
        </property>
        <property name="baiGroupFieldSetMapper">
            <bean class="com.distributedfinance.mbi.bai.mapper.BaiGroupFieldSetMapper"/>
        </property>
        <property name="baiAccountFieldSetMapper">
            <bean class="com.distributedfinance.mbi.bai.mapper.BaiAccountFieldSetMapper">
                <property name="parser">
                    <bean class="com.distributedfinance.mbi.bai.mapper.BaiTypeParser"/>
                </property>
            </bean>
        </property>
        <property name="baiTransactionFieldSetMapper">
            <bean class="com.distributedfinance.mbi.bai.mapper.BaiTransactionFieldSetMapper"/>
        </property>
    </bean>

    <bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
        <property name="resource" value="#{jobParameters['input.file.url']}"/>
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"/>
                </property>
                <property name="fieldSetMapper">
                    <bean class="org.springframework.batch.item.file.mapping.PassThroughFieldSetMapper"/>
                </property>
            </bean>
        </property>
    </bean>

    <bean id="baiItemProcessor" class="com.distributedfinance.mbi.bai.processor.BaiItemProcessor">
        <constructor-arg index="0" ref="accountLookup"/>
        <constructor-arg index="1">
            <bean class="java.text.SimpleDateFormat">
                <constructor-arg value="yyMMddHHmmss"/>
            </bean>
        </constructor-arg>
    </bean>

    <bean id="baiItemWriter" class="com.distributedfinance.mbi.bai.writer.BaiItemWriter"/>

    <bean id="accountLookup" class="com.distributedfinance.mbi.bai.lookup.AccountLookup"/>

    <bean id="fileArchivingTasklet" class="com.distributedfinance.mbi.bai.tasklet.FileArchivingTasklet">
        <property name="downloadFileKey" value="input.file.url"/>
        <property name="archiveDirectory" value="${bai.sftp.archive-dir}"/>
        <property name="purgeDays" value="${bai.sftp.purge-days}"/>
    </bean>
 </beans>

您不能从 service-activator 引用 fileMessageToJobRequestTransformer bean;框架不知道调用哪个方法。实际的转换器 bean 包装在消费者 bean 中。

看起来您根本不需要服务激活器 - 您已经将转换器订阅到该频道 input-channel="inboundFileChannel"

根据您的配置,您在该通道上有两个消费者 - 转换器和服务激活器。

编辑

变形金刚总是希望return回复;因为您的 class 可能会或可能不会 return 回复,所以它不是真正的转换器(严格意义上),它是一种服务。

简单使用

<int:service-activator id="fileMessageToJobRequestTransformer"
                 input-channel="inboundFileChannel"
                 output-channel="outboundJobRequestChannel"
                 requires-reply="false"
                 method="transform" >
    <bean class="com.distributedfinance.mbi.bai.transformer.FileMessageToJobRequestTransformer">
        <property name="job" ref="baiParseJob"/>
        <property name="fileParameterName" value="input.file.url"/>
    </bean>
    <int:poller fixed-rate="1000"/>
</int:service-activator>