在 autoStart = "false" 之后启动 SftpFile 同步 MessageSource

Starting SftpFileSynchronizingMessageSource after autoStartup = "false"

我对 spring-integration(-sftp) 很陌生。
我在 @InboundChannelAdapter-Annotation 中为我的 SftpInboundFileSynchronizingMessageSource 使用 autoStartup = false ,以避免在本地目录不为空时出现一堆异常。

之后如何启动 MessageSource,最好不要使用控制总线模式?

// Class Annotations
@Configuration
@EnableIntegration
@ContextConfiguration(classes = SftpReadTest.class)
@RunWith(SpringJUnit4ClassRunner.class)

/*
* Omitted Beans: sessionFactory, synchronizer, messageSource and messageHandler
*/

// Method with which I try to start the the MessageSource
public void startAdapter() {
    ConfigurableApplicationContext ctx = new 
        AnnotationConfigApplicationContext(this.getClass());
    SftpInboundFileSynchronizingMessageSource messageSource = 
        ctx.getBean("sftpInboundMessageSource", 
        SftpInboundFileSynchronizingMessageSource.class);
    messageSource.start();
}

// Test method
@Test
public void test() throws Exception {
    // given
    File testFile = create(REMOTE_DIR);
    // when
    startAdapter();
    int loopCounter = 0;
    while (!new File(LOCAL_DIR, testFile.getName()).exists()
        && loopCounter++ < 99) {
        Thread.sleep(100);
    }
    log.info("{} looped", loopCounter);
    // then
    assertThat(new File(LOCAL_DIR, testFile.getName()).exists(), is(true));
}

这是我最后一次尝试。好像MessageSource启动了,但是没有传输文件。

2016-12-07 18:22:28,829 INFO  Loaded default TestExecutionListener class names from location [META-INF/spring.factories]: [org.springframework.test.context.web.ServletTestExecutionListener, org.springframework.test.context.support.DirtiesContextBeforeModesTestExecutionListener, org.springframework.test.context.support.DependencyInjectionTestExecutionListener, org.springframework.test.context.support.DirtiesContextTestExecutionListener, org.springframework.test.context.transaction.TransactionalTestExecutionListener, org.springframework.test.context.jdbc.SqlScriptsTestExecutionListener] org.springframework.test.context.support.AbstractTestContextBootstrapper.getDefaultTestExecutionListenerClassNames(AbstractTestContextBootstrapper.java:260) [main] []
2016-12-07 18:22:28,862 INFO  Using TestExecutionListeners: [org.springframework.test.context.web.ServletTestExecutionListener@6c3f5566, org.springframework.test.context.support.DirtiesContextBeforeModesTestExecutionListener@12405818, org.springframework.test.context.support.DependencyInjectionTestExecutionListener@314c508a, org.springframework.test.context.support.DirtiesContextTestExecutionListener@10b48321, org.springframework.test.context.transaction.TransactionalTestExecutionListener@6b67034, org.springframework.test.context.jdbc.SqlScriptsTestExecutionListener@16267862] org.springframework.test.context.support.AbstractTestContextBootstrapper.getTestExecutionListeners(AbstractTestContextBootstrapper.java:187) [main] []
2016-12-07 18:22:28,917 INFO  Using open server port...22                                       com.example.SftpServer.start(SftpServer.java:36) [main] []
2016-12-07 18:22:29,325 INFO  Trying to register BouncyCastle as a JCE provider                 org.apache.sshd.common.util.SecurityUtils$BouncyCastleRegistration.call(SecurityUtils.java:287) [main] []
2016-12-07 18:22:29,769 INFO  Registration succeeded                                            org.apache.sshd.common.util.SecurityUtils$BouncyCastleRegistration.call(SecurityUtils.java:291) [main] []
2016-12-07 18:22:30,023 INFO  Refreshing org.springframework.context.support.GenericApplicationContext@4d48bd85: startup date [Wed Dec 07 18:22:30 CET 2016]; root of context hierarchy org.springframework.context.support.AbstractApplicationContext.prepareRefresh(AbstractApplicationContext.java:581) [main] []
2016-12-07 18:22:30,088 INFO  Loading properties file from URL [jar:file:/C:/dev/maven/maven-repo/org/springframework/integration/spring-integration-core/4.3.0.RELEASE/spring-integration-core-4.3.0.RELEASE.jar!/META-INF/spring.integration.default.properties] org.springframework.core.io.support.PropertiesLoaderSupport.loadProperties(PropertiesLoaderSupport.java:172) [main] []
2016-12-07 18:22:30,093 INFO  No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. org.springframework.integration.config.IntegrationRegistrar.registerHeaderChannelRegistry(IntegrationRegistrar.java:330) [main] []
2016-12-07 18:22:30,230 INFO  No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor.registerErrorChannel(DefaultConfiguringBeanFactoryPostProcessor.java:130) [main] []
2016-12-07 18:22:30,239 INFO  No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor.registerTaskScheduler(DefaultConfiguringBeanFactoryPostProcessor.java:158) [main] []
2016-12-07 18:22:30,248 INFO  JSR-330 'javax.inject.Inject' annotation found and supported for autowiring org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.<init>(AutowiredAnnotationBeanPostProcessor.java:156) [main] []
2016-12-07 18:22:30,349 INFO  Loading properties file from URL [jar:file:/C:/dev/maven/maven-repo/org/springframework/integration/spring-integration-core/4.3.0.RELEASE/spring-integration-core-4.3.0.RELEASE.jar!/META-INF/spring.integration.default.properties] org.springframework.core.io.support.PropertiesLoaderSupport.loadProperties(PropertiesLoaderSupport.java:172) [main] []
2016-12-07 18:22:30,350 INFO  Bean 'integrationGlobalProperties' of type [class org.springframework.beans.factory.config.PropertiesFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker.postProcessAfterInitialization(PostProcessorRegistrationDelegate.java:328) [main] []
2016-12-07 18:22:30,360 INFO  Bean 'integrationGlobalProperties' of type [class java.util.Properties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker.postProcessAfterInitialization(PostProcessorRegistrationDelegate.java:328) [main] []
testDir/asd
2016-12-07 18:22:30,837 INFO  Initializing ExecutorService  'taskScheduler'                     org.springframework.scheduling.concurrent.ExecutorConfigurationSupport.initialize(ExecutorConfigurationSupport.java:165) [main] []
2016-12-07 18:22:30,996 INFO  Starting beans in phase -2147483648                               org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:341) [main] []
2016-12-07 18:22:30,997 INFO  Adding {message-handler:sftpReadTest.messageHandler.serviceActivator} as a subscriber to the 'inboundChannel' channel org.springframework.integration.endpoint.EventDrivenConsumer.logComponentSubscriptionEvent(EventDrivenConsumer.java:108) [main] []
2016-12-07 18:22:30,998 INFO  Channel 'org.springframework.context.support.GenericApplicationContext@4d48bd85.inboundChannel' has 1 subscriber(s). org.springframework.integration.channel.AbstractSubscribableChannel.adjustCounterIfNecessary(AbstractSubscribableChannel.java:69) [main] []
2016-12-07 18:22:30,998 INFO  started sftpReadTest.messageHandler.serviceActivator              org.springframework.integration.endpoint.AbstractEndpoint.start(AbstractEndpoint.java:97) [main] []
2016-12-07 18:22:30,998 INFO  Starting beans in phase 0                                         org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:341) [main] []
2016-12-07 18:22:30,998 INFO  Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel org.springframework.integration.endpoint.EventDrivenConsumer.logComponentSubscriptionEvent(EventDrivenConsumer.java:108) [main] []
2016-12-07 18:22:30,999 INFO  Channel 'org.springframework.context.support.GenericApplicationContext@4d48bd85.errorChannel' has 1 subscriber(s). org.springframework.integration.channel.AbstractSubscribableChannel.adjustCounterIfNecessary(AbstractSubscribableChannel.java:69) [main] []
2016-12-07 18:22:30,999 INFO  started _org.springframework.integration.errorLogger              org.springframework.integration.endpoint.AbstractEndpoint.start(AbstractEndpoint.java:97) [main] []
2016-12-07 18:22:31,051 INFO  Method: StartAdapter                                              com.example.SftpReadTest.startAdapter(SftpReadTest.java:106) [main] []
2016-12-07 18:22:31,066 INFO  Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1afd72ef: startup date [Wed Dec 07 18:22:31 CET 2016]; root of context hierarchy org.springframework.context.support.AbstractApplicationContext.prepareRefresh(AbstractApplicationContext.java:581) [main] []
2016-12-07 18:22:31,077 INFO  No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. org.springframework.integration.config.IntegrationRegistrar.registerHeaderChannelRegistry(IntegrationRegistrar.java:330) [main] []
2016-12-07 18:22:31,084 INFO  No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor.registerErrorChannel(DefaultConfiguringBeanFactoryPostProcessor.java:130) [main] []
2016-12-07 18:22:31,085 INFO  No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor.registerTaskScheduler(DefaultConfiguringBeanFactoryPostProcessor.java:158) [main] []
2016-12-07 18:22:31,086 INFO  JSR-330 'javax.inject.Inject' annotation found and supported for autowiring org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.<init>(AutowiredAnnotationBeanPostProcessor.java:156) [main] []
2016-12-07 18:22:31,106 INFO  Loading properties file from URL [jar:file:/C:/dev/maven/maven-repo/org/springframework/integration/spring-integration-core/4.3.0.RELEASE/spring-integration-core-4.3.0.RELEASE.jar!/META-INF/spring.integration.default.properties] org.springframework.core.io.support.PropertiesLoaderSupport.loadProperties(PropertiesLoaderSupport.java:172) [main] []
2016-12-07 18:22:31,107 INFO  Bean 'integrationGlobalProperties' of type [class org.springframework.beans.factory.config.PropertiesFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker.postProcessAfterInitialization(PostProcessorRegistrationDelegate.java:328) [main] []
2016-12-07 18:22:31,107 INFO  Bean 'integrationGlobalProperties' of type [class java.util.Properties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker.postProcessAfterInitialization(PostProcessorRegistrationDelegate.java:328) [main] []
testDir/asd
2016-12-07 18:22:31,201 INFO  Initializing ExecutorService  'taskScheduler'                     org.springframework.scheduling.concurrent.ExecutorConfigurationSupport.initialize(ExecutorConfigurationSupport.java:165) [main] []
2016-12-07 18:22:31,331 INFO  Starting beans in phase -2147483648                               org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:341) [main] []
2016-12-07 18:22:31,332 INFO  Adding {message-handler:sftpReadTest.messageHandler.serviceActivator} as a subscriber to the 'inboundChannel' channel org.springframework.integration.endpoint.EventDrivenConsumer.logComponentSubscriptionEvent(EventDrivenConsumer.java:108) [main] []
2016-12-07 18:22:31,338 INFO  Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@1afd72ef.inboundChannel' has 1 subscriber(s). org.springframework.integration.channel.AbstractSubscribableChannel.adjustCounterIfNecessary(AbstractSubscribableChannel.java:69) [main] []
2016-12-07 18:22:31,339 INFO  started sftpReadTest.messageHandler.serviceActivator              org.springframework.integration.endpoint.AbstractEndpoint.start(AbstractEndpoint.java:97) [main] []
2016-12-07 18:22:31,339 INFO  Starting beans in phase 0                                         org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:341) [main] []
2016-12-07 18:22:31,339 INFO  Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel org.springframework.integration.endpoint.EventDrivenConsumer.logComponentSubscriptionEvent(EventDrivenConsumer.java:108) [main] []
2016-12-07 18:22:31,340 INFO  Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@1afd72ef.errorChannel' has 1 subscriber(s). org.springframework.integration.channel.AbstractSubscribableChannel.adjustCounterIfNecessary(AbstractSubscribableChannel.java:69) [main] []
2016-12-07 18:22:31,340 INFO  started _org.springframework.integration.errorLogger              org.springframework.integration.endpoint.AbstractEndpoint.start(AbstractEndpoint.java:97) [main] []
2016-12-07 18:22:31,342 INFO  Bean: org.springframework.context.annotation.internalConfigurationAnnotationProcessor com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,343 INFO  Bean: org.springframework.context.annotation.internalAutowiredAnnotationProcessor com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,343 INFO  Bean: org.springframework.context.annotation.internalRequiredAnnotationProcessor com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,343 INFO  Bean: org.springframework.context.annotation.internalCommonAnnotationProcessor com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,344 INFO  Bean: org.springframework.context.event.internalEventListenerProcessor com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,344 INFO  Bean: org.springframework.context.event.internalEventListenerFactory com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,344 INFO  Bean: sftpReadTest                                                com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,344 INFO  Bean: org.springframework.context.annotation.ConfigurationClassPostProcessor.importAwareProcessor com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,344 INFO  Bean: org.springframework.context.annotation.ConfigurationClassPostProcessor.enhancedConfigurationProcessor com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,345 INFO  Bean: sessionFactory                                              com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,345 INFO  Bean: sftpInboundMessageSource                                    com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,345 INFO  Bean: sftpFileSynchronizer                                        com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,345 INFO  Bean: messageHandler                                              com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,345 INFO  Bean: channelInitializer                                          com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,346 INFO  Bean: $autoCreateChannelCandidates                                com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,346 INFO  Bean: IntegrationConfigurationBeanFactoryPostProcessor            com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,346 INFO  Bean: integrationEvaluationContext                                com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,346 INFO  Bean: org.springframework.integration.expression.IntegrationEvaluationContextAwareBeanPostProcessor#0 com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,346 INFO  Bean: integrationGlobalProperties                                 com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,346 INFO  Bean: integrationHeaderChannelRegistry                            com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,346 INFO  Bean: globalChannelInterceptorProcessor                           com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,347 INFO  Bean: DefaultConfiguringBeanFactoryPostProcessor                  com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,347 INFO  Bean: datatypeChannelMessageConverter                             com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,347 INFO  Bean: org.springframework.integration.internalMessagingAnnotationPostProcessor com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,347 INFO  Bean: org.springframework.integration.internalPublisherAnnotationBeanPostProcessor com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,347 INFO  Bean: messageBuilderFactory                                       com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,347 INFO  Bean: integrationLifecycleRoleController                          com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,348 INFO  Bean: nullChannel                                                 com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,348 INFO  Bean: errorChannel                                                com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,348 INFO  Bean: _org.springframework.integration.errorLogger.handler        com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,348 INFO  Bean: _org.springframework.integration.errorLogger                com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,348 INFO  Bean: taskScheduler                                               com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:31,348 INFO  Bean: org.springframework.integration.config.IdGeneratorConfigurer#0 com.example.SftpReadTest.lambda(SftpReadTest.java:108) [main] []
2016-12-07 18:22:43,251 INFO  100 mal geloopt                                                   com.example.SftpReadTest.test(SftpReadTest.java:141) [main] []
2016-12-07 18:22:43,328 INFO  Closing org.springframework.context.support.GenericApplicationContext@4d48bd85: startup date [Wed Dec 07 18:22:30 CET 2016]; root of context hierarchy org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:982) [Thread-1] []
2016-12-07 18:22:43,329 INFO  Stopping beans in phase 0                                         org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.stop(DefaultLifecycleProcessor.java:356) [Thread-1] []
2016-12-07 18:22:43,330 INFO  Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel org.springframework.integration.endpoint.EventDrivenConsumer.logComponentSubscriptionEvent(EventDrivenConsumer.java:108) [Thread-1] []
2016-12-07 18:22:43,331 INFO  Channel 'org.springframework.context.support.GenericApplicationContext@4d48bd85.errorChannel' has 0 subscriber(s). org.springframework.integration.channel.AbstractSubscribableChannel.adjustCounterIfNecessary(AbstractSubscribableChannel.java:69) [Thread-1] []
2016-12-07 18:22:43,331 INFO  stopped _org.springframework.integration.errorLogger              org.springframework.integration.endpoint.AbstractEndpoint.stop(AbstractEndpoint.java:131) [Thread-1] []
2016-12-07 18:22:43,331 INFO  Stopping beans in phase -2147483648                               org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.stop(DefaultLifecycleProcessor.java:356) [Thread-1] []
2016-12-07 18:22:43,331 INFO  Removing {message-handler:sftpReadTest.messageHandler.serviceActivator} as a subscriber to the 'inboundChannel' channel org.springframework.integration.endpoint.EventDrivenConsumer.logComponentSubscriptionEvent(EventDrivenConsumer.java:108) [Thread-1] []
2016-12-07 18:22:43,331 INFO  Channel 'org.springframework.context.support.GenericApplicationContext@4d48bd85.inboundChannel' has 0 subscriber(s). org.springframework.integration.channel.AbstractSubscribableChannel.adjustCounterIfNecessary(AbstractSubscribableChannel.java:69) [Thread-1] []
2016-12-07 18:22:43,331 INFO  stopped sftpReadTest.messageHandler.serviceActivator              org.springframework.integration.endpoint.AbstractEndpoint.stop(AbstractEndpoint.java:131) [Thread-1] []
2016-12-07 18:22:43,332 INFO  Shutting down ExecutorService 'taskScheduler'                     org.springframework.scheduling.concurrent.ExecutorConfigurationSupport.shutdown(ExecutorConfigurationSupport.java:203) [Thread-1] []

对于这个问题,非常感谢你的帮助。也感谢提示在哪里查看。

@InboundChannelAdapter 的文档说:

This produces a SourcePollingChannelAdapter integration component based on a MethodInvokingMessageSource for the annotated method.

所以,你做的是正确的,但是你必须拉一个SourcePollingChannelAdapter bean,而不是SftpInboundFileSynchronizingMessageSource

它的 bean id 基于模式:

The bean names are generated with this algorithm: * The MessageHandler (MessageSource) @Bean gets its own standard name from the method name or name attribute on the @Bean. This works like there is no Messaging Annotation on the @Bean method. * The AbstractEndpoint bean name is generated with the pattern: [configurationComponentName].[methodName].[decapitalizedAnnotationClassShortName]. For example the endpoint (SourcePollingChannelAdapter) for the consoleSource() definition above gets a bean name like: myFlowConfiguration.consoleSource.inboundChannelAdapter.

您可以在 Reference Manual 中找到这样的描述。