配置了 SFTP 入站通道以从 SFTP 服务器获取文件,但我的测试用例没有使它生效

Configured a SFTP inbound channel to get a file from a SFTP server but my test case does not bring it alive

我想从 SFTP 服务器获取文件,将 JSON 内容转换为我的 类 之一的实例数组。 因此,我考虑过在 Spring 引导应用程序中使用 Spring 集成,并且很高兴能够流畅地编程 IntegrationFlows 之一来实现这一点。

到目前为止,我在 SO 中深入研究了许多文章、问题和答案,但没有任何帮助。几个星期以来我一直在努力解决这个问题。 Spring 集成文档本身就很难理解。根据 Spring 版本和不同的编程范式(XML、Java 配置和 Java DSL),它有很多不同的方面,这给问题,这使得决定遵循哪些罕见的例子变得更加困难,并且最终对新手毫无帮助。一个新手想要找到他的问题,并希望被推荐和引导到当前解决他的问题的最佳方法,被解释为什么和赔率和偶数。如果他足够高兴,他将能够在其中一个示例中找到他的问题(从 SFTP 获取文件以进一步处理它,这不是一项奇异的任务)并且解决方案非常适合示例,他可以用最少的复制和粘贴改编。直到现在我才这么幸运。

SO 中的一个问题接近我可能需要的:配置了一个 sftpInboundFileSynchonizer,以及一个 MessageSource 和一个 MassageHandler。提问者想和我做大致相同的事情。答案是“该代码正在做相反的事情(发送到 SFTP)”。这让我目瞪口呆,我是不是把inbound和outbound的理解搞混了?

我想我必须使用基于 SftpSessionFactory 的 InboundAdapter。我将 SessionFactory 放入一个额外的配置中,因为我也计划将它与其他适配器一起使用:

@Configuration
@ConfigurationProperties("test.job.common")
public class JobCommonConfiguration {
    private static final Logger LOG = LoggerFactory.getLogger(JobCommonConfiguration.class);

    private String hostname;
    private int port;
    private String username;
    private String password;

    @Bean
    public SessionFactory<LsEntry> sftpTest3SessionFactory() {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(hostname);
        sf.setPort(port);
        sf.setUser(username);
        sf.setPassword(password);
//      factory.setTestSession(true);
        return new CachingSessionFactory<LsEntry>(sf);
    }

    /* getters & setters */

}

第二个配置是配置一个SftpInboundAdapter,假设我对“入站”的理解是正确的,它带有一个SI Transformer来将JSON转换成我的事件实例数组.最后,实例应该通过请求发送到 HTTP REST 服务,我也可以将其包含到适配器中吗?具体如下:

@Configuration
@ConfigurationProperties("test.job.transfer3")
@Import({ JobCommonConfiguration.class })
public class Job3Configuration {
    private static final Logger LOG = LoggerFactory.getLogger(Job3Configuration.class);

    private boolean enabled = false;
    private String remoteDir;
    private String remoteFile;
    private String remoteFilePattern;
    private boolean remoteRemove;
    private String localDir;
    private String localFile;

    @Autowired
    private SessionFactory<LsEntry> sftpTest3SessionFactory;

    @Bean
    public FireOnceTrigger fireOnceTest3Trigger() {
        return new FireOnceTrigger();
    }

    @Bean
    public IntegrationFlow test3SftpInboundFlow() {
        return IntegrationFlows
            .from(Sftp.inboundAdapter(sftpTest3SessionFactory)
                .preserveTimestamp(true)
                .remoteDirectory(remoteDir)
                .regexFilter(remoteFilePattern)
                .localFilenameExpression(localFile)
                .localDirectory(new File(localDir)),
            e -> e.id("sftpTest3InboundAdapter")
                .autoStartup(true)
                .poller(Pollers.trigger(fireOnceTest3Trigger()))
            )
            .transform(Transformers.fromJson(Event[].class))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }

    /* getters & setters */
}

我的实体事件很简单:

public class Event {

    private Integer crmId;
    private String eventType;
    private LocalDateTime dateFrom;
    private LocalDateTime dateTo;

    /* getters & setter & toString() */
}

我的测试还没有测试任何东西,因为到目前为止什么都没有发生。然后它应该断言我收到了正确数量的事件。它看起来如下:

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes=SftpTransferTestApplication.class)
@Import(Job3Configuration.class)
public class GetFileIntegrationTest {
    private static final Logger LOG = LoggerFactory.getLogger(GetFileIntegrationTest.class);

    @Test
    public void testGetFile() throws Exception {
        LOG.info("GetIntegrationTest testgetFile");

//      assertThat(fileReceived, is(sb.toString()));
    }
}

测试应用程序很简单:

@SpringBootApplication
@EnableIntegration
@IntegrationComponentScan
public class SftpTransferTestApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpTransferTestApplication.class).web(
            NONE).run(args);
    }
}

我的项目有 2.3 版的 spring-boot-starter-parent 作为父项目。0.RELEASE 并使用 5.3 版的 spring-integration-sftp。 0.RELEASE.

请帮我把这个测试用例激活。我做错了什么?

如何将 Logger 功能包含到 IntegrationFlow 中以查看更多正在发生(或未发生)的事情?

编辑 1

我尝试稍微剥离我的代码以避免配置问题:

@Configuration
public class JobConfiguration {
    private static final Logger LOG = LoggerFactory.getLogger(JobConfiguration.class);
    @Bean
    public TransferChannel getTransferChannel() {
        TransferChannel channel = new TransferChannel();
        channel.setHost("myHost");
        channel.setPort(0);
        channel.setUser("test");
        channel.setPassword("xxx");
        return channel;
    }

    @Bean
    public TransferContext getTransferContext() {
        TransferContext context = new TransferContext();
        context.setEnabled(false);
        context.setChannel(getTransferChannel());
        context.setRemoteDir("data");
        context.setRemoteFilename("GetMessage3.json");
        context.setRemoteFilenameFilter("GetMessage\.json$");
        context.setLocalDir("sftp-inbound");
        context.setLocalFile("GetMessage3.json");
        return context;
    }

    @Bean
    public SessionFactory<LsEntry> getSftpTestSessionFactory(TransferChannel transferChannel) {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(transferChannel.getHost());
        sf.setPort(transferChannel.getPort());
        sf.setUser(transferChannel.getUser());
        sf.setPassword(transferChannel.getPassword());
//      factory.setTestSession(true);
        return new CachingSessionFactory<LsEntry>(sf);
    }

    @Bean
    public FireOnceTrigger fireOnceTestTrigger() {
        return new FireOnceTrigger();
    }

    @Bean
    public IntegrationFlow testSftpInboundFlow(TransferContext context) {
        return IntegrationFlows
            .from(Sftp.inboundAdapter(getSftpTestSessionFactory(context.getChannel()))
                    .preserveTimestamp(true)
                    .remoteDirectory(context.getRemoteDir())
                    .regexFilter(context.getRemoteFilenameFilter())
                    .localFilenameExpression(context.getLocalFile())
                    .localDirectory(new File(context.getLocalDir())),
                e -> e.id("sftpTestInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.trigger(fireOnceTestTrigger()))
//                  .poller(Pollers.fixedDelay(5000))
                )
            .transform(Transformers.fromJson(Event[].class))
            .channel("sftpTestMessageChannel")
//          .logAndReply(Level.DEBUG);
//          .handle(m -> System.out.println("myHandler: " + m.getPayload().toString()))
            .get();
    }

    @Bean("someService.handler")             
    @EndpointId("someService")               
    @ServiceActivator(inputChannel = "sftpTestMessageChannel")
    public MessageHandler someHandler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println("myThirdHandler: " + message.getPayload());
                System.out.println("myThirdHandler: " + message.getHeaders());
                Event ev = ((Event[]) message.getPayload())[1];
                System.out.println("myThirdHandler: " + ev);
                throw new IllegalStateException("Want to see the next MessageHandler");
            }
        };
    }
}

为了让ftp实现Jsch多说话一点,我有一个配置“application-junit.yaml”:

logging:
  level:
    org.springframework.integration: debug
    ### filter warning
    ### org.springframework.integration.expression.ExpressionUtils:
    ### Creating EvaluationContext with no beanFactory
    org.springframework.integration.expression.ExpressionUtils: error
    ### filter info
    ###    because jsch is very talkative
    com.jcraft.jsch: debug
    com.harry.potter: debug
    com.harry.potter.filetransfer: trace

我无法让它工作。日志输出为:

  .   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.3.0.RELEASE)

2021-08-22 23:29:27.958  INFO 9916 --- [           main] c.l.c.f.service.GetFileIntegrationTest   : Starting GetFileIntegrationTest on myClient with PID 9916 (started by test in C:\Users\test\git\tis\sftp-client)
2021-08-22 23:29:27.959 DEBUG 9916 --- [           main] c.l.c.f.service.GetFileIntegrationTest   : Running with Spring Boot v2.3.0.RELEASE, Spring v5.2.6.RELEASE
2021-08-22 23:29:27.959  INFO 9916 --- [           main] c.l.c.f.service.GetFileIntegrationTest   : No active profile set, falling back to default profiles: default
2021-08-22 23:29:28.752  INFO 9916 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-08-22 23:29:28.759  INFO 9916 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-08-22 23:29:28.763 DEBUG 9916 --- [           main] faultConfiguringBeanFactoryPostProcessor : SpEL function '#xpath' isn't registered: there is no spring-integration-xml.jar on the classpath.
2021-08-22 23:29:28.766  INFO 9916 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-08-22 23:29:28.837  INFO 9916 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-22 23:29:28.841  INFO 9916 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-22 23:29:28.873  INFO 9916 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-22 23:29:30.298  INFO 9916 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2021-08-22 23:29:30.802  INFO 9916 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2021-08-22 23:29:30.894 DEBUG 9916 --- [           main] faultConfiguringBeanFactoryPostProcessor : 
Spring Integration global properties:
    
    spring.integration.endpoints.noAutoStartup=
    spring.integration.taskScheduler.poolSize=10
    spring.integration.channels.maxUnicastSubscribers=0x7fffffff
    spring.integration.channels.autoCreate=true
    spring.integration.channels.maxBroadcastSubscribers=0x7fffffff
    spring.integration.readOnly.headers=
    spring.integration.messagingTemplate.throwExceptionOnLateReply=false
  
2021-08-22 23:29:30.901 DEBUG 9916 --- [           main] .s.i.c.GlobalChannelInterceptorProcessor : No global channel interceptors.
2021-08-22 23:29:30.904  INFO 9916 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-22 23:29:30.904  INFO 9916 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2021-08-22 23:29:30.904  INFO 9916 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2021-08-22 23:29:30.905  INFO 9916 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-08-22 23:29:30.905  INFO 9916 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.testSftpInboundFlow.channel#0' has 1 subscriber(s).
2021-08-22 23:29:30.905  INFO 9916 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:30.905  INFO 9916 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:someService} as a subscriber to the 'sftpTestMessageChannel' channel
2021-08-22 23:29:30.905  INFO 9916 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.sftpTestMessageChannel' has 1 subscriber(s).
2021-08-22 23:29:30.905  INFO 9916 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'someService'
2021-08-22 23:29:30.910  INFO 9916 --- [           main] o.s.i.e.SourcePollingChannelAdapter      : started bean 'sftpTestInboundAdapter'; defined in: 'class path resource     [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:30.922  INFO 9916 --- [           main] c.l.c.f.service.GetFileIntegrationTest   : Started GetFileIntegrationTest in 3.323 seconds (JVM running for 4.13)
2021-08-22 23:29:30.935  INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch                          : Connecting to myHost port 22
2021-08-22 23:29:30.968  INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch                          : Connection established
2021-08-22 23:29:31.041  INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch                          : Remote version string: SSH-2.0-OpenSSH_for_Windows_8.1
2021-08-22 23:29:31.042  INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch                          : Local version string: SSH-2.0-JSCH-0.1.54
2021-08-22 23:29:31.042  INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch                          : CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
2021-08-22 23:29:31.216  INFO 9916 --- [           main] c.l.c.f.service.GetFileIntegrationTest   : GetIntegrationTest testgetFile
2021-08-22 23:29:31.226  INFO 9916 --- [extShutdownHook] o.s.i.e.SourcePollingChannelAdapter      : stopped bean 'sftpTestInboundAdapter'; defined in: 'class path resource [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.integration.channel.DirectChannel    : Channel 'application.testSftpInboundFlow.channel#0' has 0 subscriber(s).
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 0 subscriber(s).
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {message-handler:someService} as a subscriber to the 'sftpTestMessageChannel' channel
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.integration.channel.DirectChannel    : Channel 'application.sftpTestMessageChannel' has 0 subscriber(s).
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean 'someService'
2021-08-22 23:29:31.227  INFO 9916 --- [extShutdownHook] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
2021-08-22 23:29:31.228  INFO 9916 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

我用 Pollers.fixedDelay(5000) 替换了 FireOnceTestTrigger,没有。

我做错了什么?

测试正在退出,JVM 在获取完成之前关闭。

您需要等待文件被提取。一种方法是在您的测试中添加一个通道拦截器。像这样:

public class GetFileIntegrationTest {
    private static final Logger LOG = LoggerFactory.getLogger(GetFileIntegrationTest.class);

    @Autowired
    AbstractMessageChannel sftpTestMessageChannel;

    @Test
    public void testGetFile() throws Exception {
        LOG.info("GetIntegrationTest testgetFile");
        CountDownLatch latch = new CountDownLatch(1);
        this.sftpTestMessageChannel.addInterceptor(new ChannelInterceptor() {
                        
             // override preSend and/or postSend, capture the message and 
             // count down the latch.
        });

        assertTrue(latch.await(10, TimeUnit.SECONDS));
//      assertThat(fileReceived, is(sb.toString()));
    }
}