配置了 SFTP 入站通道以从 SFTP 服务器获取文件,但我的测试用例没有使它生效
Configured a SFTP inbound channel to get a file from a SFTP server but my test case does not bring it alive
我想从 SFTP 服务器获取文件,将 JSON 内容转换为我的 类 之一的实例数组。
因此,我考虑过在 Spring 引导应用程序中使用 Spring 集成,并且很高兴能够流畅地编程 IntegrationFlows 之一来实现这一点。
到目前为止,我在 SO 中深入研究了许多文章、问题和答案,但没有任何帮助。几个星期以来我一直在努力解决这个问题。 Spring 集成文档本身就很难理解。根据 Spring 版本和不同的编程范式(XML、Java 配置和 Java DSL),它有很多不同的方面,这给问题,这使得决定遵循哪些罕见的例子变得更加困难,并且最终对新手毫无帮助。一个新手想要找到他的问题,并希望被推荐和引导到当前解决他的问题的最佳方法,被解释为什么和赔率和偶数。如果他足够高兴,他将能够在其中一个示例中找到他的问题(从 SFTP 获取文件以进一步处理它,这不是一项奇异的任务)并且解决方案非常适合示例,他可以用最少的复制和粘贴改编。直到现在我才这么幸运。
SO 中的一个问题接近我可能需要的:配置了一个 sftpInboundFileSynchonizer,以及一个 MessageSource 和一个 MassageHandler。提问者想和我做大致相同的事情。答案是“该代码正在做相反的事情(发送到 SFTP)”。这让我目瞪口呆,我是不是把inbound和outbound的理解搞混了?
我想我必须使用基于 SftpSessionFactory 的 InboundAdapter。我将 SessionFactory 放入一个额外的配置中,因为我也计划将它与其他适配器一起使用:
@Configuration
@ConfigurationProperties("test.job.common")
public class JobCommonConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(JobCommonConfiguration.class);
private String hostname;
private int port;
private String username;
private String password;
@Bean
public SessionFactory<LsEntry> sftpTest3SessionFactory() {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(hostname);
sf.setPort(port);
sf.setUser(username);
sf.setPassword(password);
// factory.setTestSession(true);
return new CachingSessionFactory<LsEntry>(sf);
}
/* getters & setters */
}
第二个配置是配置一个SftpInboundAdapter,假设我对“入站”的理解是正确的,它带有一个SI Transformer来将JSON转换成我的事件实例数组.最后,实例应该通过请求发送到 HTTP REST 服务,我也可以将其包含到适配器中吗?具体如下:
@Configuration
@ConfigurationProperties("test.job.transfer3")
@Import({ JobCommonConfiguration.class })
public class Job3Configuration {
private static final Logger LOG = LoggerFactory.getLogger(Job3Configuration.class);
private boolean enabled = false;
private String remoteDir;
private String remoteFile;
private String remoteFilePattern;
private boolean remoteRemove;
private String localDir;
private String localFile;
@Autowired
private SessionFactory<LsEntry> sftpTest3SessionFactory;
@Bean
public FireOnceTrigger fireOnceTest3Trigger() {
return new FireOnceTrigger();
}
@Bean
public IntegrationFlow test3SftpInboundFlow() {
return IntegrationFlows
.from(Sftp.inboundAdapter(sftpTest3SessionFactory)
.preserveTimestamp(true)
.remoteDirectory(remoteDir)
.regexFilter(remoteFilePattern)
.localFilenameExpression(localFile)
.localDirectory(new File(localDir)),
e -> e.id("sftpTest3InboundAdapter")
.autoStartup(true)
.poller(Pollers.trigger(fireOnceTest3Trigger()))
)
.transform(Transformers.fromJson(Event[].class))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
/* getters & setters */
}
我的实体事件很简单:
public class Event {
private Integer crmId;
private String eventType;
private LocalDateTime dateFrom;
private LocalDateTime dateTo;
/* getters & setter & toString() */
}
我的测试还没有测试任何东西,因为到目前为止什么都没有发生。然后它应该断言我收到了正确数量的事件。它看起来如下:
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes=SftpTransferTestApplication.class)
@Import(Job3Configuration.class)
public class GetFileIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(GetFileIntegrationTest.class);
@Test
public void testGetFile() throws Exception {
LOG.info("GetIntegrationTest testgetFile");
// assertThat(fileReceived, is(sb.toString()));
}
}
测试应用程序很简单:
@SpringBootApplication
@EnableIntegration
@IntegrationComponentScan
public class SftpTransferTestApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpTransferTestApplication.class).web(
NONE).run(args);
}
}
我的项目有 2.3 版的 spring-boot-starter-parent 作为父项目。0.RELEASE 并使用 5.3 版的 spring-integration-sftp。 0.RELEASE.
请帮我把这个测试用例激活。我做错了什么?
如何将 Logger 功能包含到 IntegrationFlow 中以查看更多正在发生(或未发生)的事情?
编辑 1
我尝试稍微剥离我的代码以避免配置问题:
@Configuration
public class JobConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(JobConfiguration.class);
@Bean
public TransferChannel getTransferChannel() {
TransferChannel channel = new TransferChannel();
channel.setHost("myHost");
channel.setPort(0);
channel.setUser("test");
channel.setPassword("xxx");
return channel;
}
@Bean
public TransferContext getTransferContext() {
TransferContext context = new TransferContext();
context.setEnabled(false);
context.setChannel(getTransferChannel());
context.setRemoteDir("data");
context.setRemoteFilename("GetMessage3.json");
context.setRemoteFilenameFilter("GetMessage\.json$");
context.setLocalDir("sftp-inbound");
context.setLocalFile("GetMessage3.json");
return context;
}
@Bean
public SessionFactory<LsEntry> getSftpTestSessionFactory(TransferChannel transferChannel) {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(transferChannel.getHost());
sf.setPort(transferChannel.getPort());
sf.setUser(transferChannel.getUser());
sf.setPassword(transferChannel.getPassword());
// factory.setTestSession(true);
return new CachingSessionFactory<LsEntry>(sf);
}
@Bean
public FireOnceTrigger fireOnceTestTrigger() {
return new FireOnceTrigger();
}
@Bean
public IntegrationFlow testSftpInboundFlow(TransferContext context) {
return IntegrationFlows
.from(Sftp.inboundAdapter(getSftpTestSessionFactory(context.getChannel()))
.preserveTimestamp(true)
.remoteDirectory(context.getRemoteDir())
.regexFilter(context.getRemoteFilenameFilter())
.localFilenameExpression(context.getLocalFile())
.localDirectory(new File(context.getLocalDir())),
e -> e.id("sftpTestInboundAdapter")
.autoStartup(true)
.poller(Pollers.trigger(fireOnceTestTrigger()))
// .poller(Pollers.fixedDelay(5000))
)
.transform(Transformers.fromJson(Event[].class))
.channel("sftpTestMessageChannel")
// .logAndReply(Level.DEBUG);
// .handle(m -> System.out.println("myHandler: " + m.getPayload().toString()))
.get();
}
@Bean("someService.handler")
@EndpointId("someService")
@ServiceActivator(inputChannel = "sftpTestMessageChannel")
public MessageHandler someHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("myThirdHandler: " + message.getPayload());
System.out.println("myThirdHandler: " + message.getHeaders());
Event ev = ((Event[]) message.getPayload())[1];
System.out.println("myThirdHandler: " + ev);
throw new IllegalStateException("Want to see the next MessageHandler");
}
};
}
}
为了让ftp实现Jsch多说话一点,我有一个配置“application-junit.yaml”:
logging:
level:
org.springframework.integration: debug
### filter warning
### org.springframework.integration.expression.ExpressionUtils:
### Creating EvaluationContext with no beanFactory
org.springframework.integration.expression.ExpressionUtils: error
### filter info
### because jsch is very talkative
com.jcraft.jsch: debug
com.harry.potter: debug
com.harry.potter.filetransfer: trace
我无法让它工作。日志输出为:
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.3.0.RELEASE)
2021-08-22 23:29:27.958 INFO 9916 --- [ main] c.l.c.f.service.GetFileIntegrationTest : Starting GetFileIntegrationTest on myClient with PID 9916 (started by test in C:\Users\test\git\tis\sftp-client)
2021-08-22 23:29:27.959 DEBUG 9916 --- [ main] c.l.c.f.service.GetFileIntegrationTest : Running with Spring Boot v2.3.0.RELEASE, Spring v5.2.6.RELEASE
2021-08-22 23:29:27.959 INFO 9916 --- [ main] c.l.c.f.service.GetFileIntegrationTest : No active profile set, falling back to default profiles: default
2021-08-22 23:29:28.752 INFO 9916 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-08-22 23:29:28.759 INFO 9916 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-08-22 23:29:28.763 DEBUG 9916 --- [ main] faultConfiguringBeanFactoryPostProcessor : SpEL function '#xpath' isn't registered: there is no spring-integration-xml.jar on the classpath.
2021-08-22 23:29:28.766 INFO 9916 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-08-22 23:29:28.837 INFO 9916 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-22 23:29:28.841 INFO 9916 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-22 23:29:28.873 INFO 9916 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-22 23:29:30.298 INFO 9916 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2021-08-22 23:29:30.802 INFO 9916 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2021-08-22 23:29:30.894 DEBUG 9916 --- [ main] faultConfiguringBeanFactoryPostProcessor :
Spring Integration global properties:
spring.integration.endpoints.noAutoStartup=
spring.integration.taskScheduler.poolSize=10
spring.integration.channels.maxUnicastSubscribers=0x7fffffff
spring.integration.channels.autoCreate=true
spring.integration.channels.maxBroadcastSubscribers=0x7fffffff
spring.integration.readOnly.headers=
spring.integration.messagingTemplate.throwExceptionOnLateReply=false
2021-08-22 23:29:30.901 DEBUG 9916 --- [ main] .s.i.c.GlobalChannelInterceptorProcessor : No global channel interceptors.
2021-08-22 23:29:30.904 INFO 9916 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-22 23:29:30.904 INFO 9916 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2021-08-22 23:29:30.904 INFO 9916 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2021-08-22 23:29:30.905 INFO 9916 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-08-22 23:29:30.905 INFO 9916 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.testSftpInboundFlow.channel#0' has 1 subscriber(s).
2021-08-22 23:29:30.905 INFO 9916 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:30.905 INFO 9916 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:someService} as a subscriber to the 'sftpTestMessageChannel' channel
2021-08-22 23:29:30.905 INFO 9916 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.sftpTestMessageChannel' has 1 subscriber(s).
2021-08-22 23:29:30.905 INFO 9916 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'someService'
2021-08-22 23:29:30.910 INFO 9916 --- [ main] o.s.i.e.SourcePollingChannelAdapter : started bean 'sftpTestInboundAdapter'; defined in: 'class path resource [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:30.922 INFO 9916 --- [ main] c.l.c.f.service.GetFileIntegrationTest : Started GetFileIntegrationTest in 3.323 seconds (JVM running for 4.13)
2021-08-22 23:29:30.935 INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch : Connecting to myHost port 22
2021-08-22 23:29:30.968 INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch : Connection established
2021-08-22 23:29:31.041 INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch : Remote version string: SSH-2.0-OpenSSH_for_Windows_8.1
2021-08-22 23:29:31.042 INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch : Local version string: SSH-2.0-JSCH-0.1.54
2021-08-22 23:29:31.042 INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch : CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
2021-08-22 23:29:31.216 INFO 9916 --- [ main] c.l.c.f.service.GetFileIntegrationTest : GetIntegrationTest testgetFile
2021-08-22 23:29:31.226 INFO 9916 --- [extShutdownHook] o.s.i.e.SourcePollingChannelAdapter : stopped bean 'sftpTestInboundAdapter'; defined in: 'class path resource [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.integration.channel.DirectChannel : Channel 'application.testSftpInboundFlow.channel#0' has 0 subscriber(s).
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:someService} as a subscriber to the 'sftpTestMessageChannel' channel
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.integration.channel.DirectChannel : Channel 'application.sftpTestMessageChannel' has 0 subscriber(s).
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'someService'
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
2021-08-22 23:29:31.228 INFO 9916 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
我用 Pollers.fixedDelay(5000) 替换了 FireOnceTestTrigger,没有。
我做错了什么?
测试正在退出,JVM 在获取完成之前关闭。
您需要等待文件被提取。一种方法是在您的测试中添加一个通道拦截器。像这样:
public class GetFileIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(GetFileIntegrationTest.class);
@Autowired
AbstractMessageChannel sftpTestMessageChannel;
@Test
public void testGetFile() throws Exception {
LOG.info("GetIntegrationTest testgetFile");
CountDownLatch latch = new CountDownLatch(1);
this.sftpTestMessageChannel.addInterceptor(new ChannelInterceptor() {
// override preSend and/or postSend, capture the message and
// count down the latch.
});
assertTrue(latch.await(10, TimeUnit.SECONDS));
// assertThat(fileReceived, is(sb.toString()));
}
}
我想从 SFTP 服务器获取文件,将 JSON 内容转换为我的 类 之一的实例数组。 因此,我考虑过在 Spring 引导应用程序中使用 Spring 集成,并且很高兴能够流畅地编程 IntegrationFlows 之一来实现这一点。
到目前为止,我在 SO 中深入研究了许多文章、问题和答案,但没有任何帮助。几个星期以来我一直在努力解决这个问题。 Spring 集成文档本身就很难理解。根据 Spring 版本和不同的编程范式(XML、Java 配置和 Java DSL),它有很多不同的方面,这给问题,这使得决定遵循哪些罕见的例子变得更加困难,并且最终对新手毫无帮助。一个新手想要找到他的问题,并希望被推荐和引导到当前解决他的问题的最佳方法,被解释为什么和赔率和偶数。如果他足够高兴,他将能够在其中一个示例中找到他的问题(从 SFTP 获取文件以进一步处理它,这不是一项奇异的任务)并且解决方案非常适合示例,他可以用最少的复制和粘贴改编。直到现在我才这么幸运。
SO 中的一个问题接近我可能需要的:配置了一个 sftpInboundFileSynchonizer,以及一个 MessageSource 和一个 MassageHandler。提问者想和我做大致相同的事情。答案是“该代码正在做相反的事情(发送到 SFTP)”。这让我目瞪口呆,我是不是把inbound和outbound的理解搞混了?
我想我必须使用基于 SftpSessionFactory 的 InboundAdapter。我将 SessionFactory 放入一个额外的配置中,因为我也计划将它与其他适配器一起使用:
@Configuration
@ConfigurationProperties("test.job.common")
public class JobCommonConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(JobCommonConfiguration.class);
private String hostname;
private int port;
private String username;
private String password;
@Bean
public SessionFactory<LsEntry> sftpTest3SessionFactory() {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(hostname);
sf.setPort(port);
sf.setUser(username);
sf.setPassword(password);
// factory.setTestSession(true);
return new CachingSessionFactory<LsEntry>(sf);
}
/* getters & setters */
}
第二个配置是配置一个SftpInboundAdapter,假设我对“入站”的理解是正确的,它带有一个SI Transformer来将JSON转换成我的事件实例数组.最后,实例应该通过请求发送到 HTTP REST 服务,我也可以将其包含到适配器中吗?具体如下:
@Configuration
@ConfigurationProperties("test.job.transfer3")
@Import({ JobCommonConfiguration.class })
public class Job3Configuration {
private static final Logger LOG = LoggerFactory.getLogger(Job3Configuration.class);
private boolean enabled = false;
private String remoteDir;
private String remoteFile;
private String remoteFilePattern;
private boolean remoteRemove;
private String localDir;
private String localFile;
@Autowired
private SessionFactory<LsEntry> sftpTest3SessionFactory;
@Bean
public FireOnceTrigger fireOnceTest3Trigger() {
return new FireOnceTrigger();
}
@Bean
public IntegrationFlow test3SftpInboundFlow() {
return IntegrationFlows
.from(Sftp.inboundAdapter(sftpTest3SessionFactory)
.preserveTimestamp(true)
.remoteDirectory(remoteDir)
.regexFilter(remoteFilePattern)
.localFilenameExpression(localFile)
.localDirectory(new File(localDir)),
e -> e.id("sftpTest3InboundAdapter")
.autoStartup(true)
.poller(Pollers.trigger(fireOnceTest3Trigger()))
)
.transform(Transformers.fromJson(Event[].class))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
/* getters & setters */
}
我的实体事件很简单:
public class Event {
private Integer crmId;
private String eventType;
private LocalDateTime dateFrom;
private LocalDateTime dateTo;
/* getters & setter & toString() */
}
我的测试还没有测试任何东西,因为到目前为止什么都没有发生。然后它应该断言我收到了正确数量的事件。它看起来如下:
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes=SftpTransferTestApplication.class)
@Import(Job3Configuration.class)
public class GetFileIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(GetFileIntegrationTest.class);
@Test
public void testGetFile() throws Exception {
LOG.info("GetIntegrationTest testgetFile");
// assertThat(fileReceived, is(sb.toString()));
}
}
测试应用程序很简单:
@SpringBootApplication
@EnableIntegration
@IntegrationComponentScan
public class SftpTransferTestApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpTransferTestApplication.class).web(
NONE).run(args);
}
}
我的项目有 2.3 版的 spring-boot-starter-parent 作为父项目。0.RELEASE 并使用 5.3 版的 spring-integration-sftp。 0.RELEASE.
请帮我把这个测试用例激活。我做错了什么?
如何将 Logger 功能包含到 IntegrationFlow 中以查看更多正在发生(或未发生)的事情?
编辑 1
我尝试稍微剥离我的代码以避免配置问题:
@Configuration
public class JobConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(JobConfiguration.class);
@Bean
public TransferChannel getTransferChannel() {
TransferChannel channel = new TransferChannel();
channel.setHost("myHost");
channel.setPort(0);
channel.setUser("test");
channel.setPassword("xxx");
return channel;
}
@Bean
public TransferContext getTransferContext() {
TransferContext context = new TransferContext();
context.setEnabled(false);
context.setChannel(getTransferChannel());
context.setRemoteDir("data");
context.setRemoteFilename("GetMessage3.json");
context.setRemoteFilenameFilter("GetMessage\.json$");
context.setLocalDir("sftp-inbound");
context.setLocalFile("GetMessage3.json");
return context;
}
@Bean
public SessionFactory<LsEntry> getSftpTestSessionFactory(TransferChannel transferChannel) {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(transferChannel.getHost());
sf.setPort(transferChannel.getPort());
sf.setUser(transferChannel.getUser());
sf.setPassword(transferChannel.getPassword());
// factory.setTestSession(true);
return new CachingSessionFactory<LsEntry>(sf);
}
@Bean
public FireOnceTrigger fireOnceTestTrigger() {
return new FireOnceTrigger();
}
@Bean
public IntegrationFlow testSftpInboundFlow(TransferContext context) {
return IntegrationFlows
.from(Sftp.inboundAdapter(getSftpTestSessionFactory(context.getChannel()))
.preserveTimestamp(true)
.remoteDirectory(context.getRemoteDir())
.regexFilter(context.getRemoteFilenameFilter())
.localFilenameExpression(context.getLocalFile())
.localDirectory(new File(context.getLocalDir())),
e -> e.id("sftpTestInboundAdapter")
.autoStartup(true)
.poller(Pollers.trigger(fireOnceTestTrigger()))
// .poller(Pollers.fixedDelay(5000))
)
.transform(Transformers.fromJson(Event[].class))
.channel("sftpTestMessageChannel")
// .logAndReply(Level.DEBUG);
// .handle(m -> System.out.println("myHandler: " + m.getPayload().toString()))
.get();
}
@Bean("someService.handler")
@EndpointId("someService")
@ServiceActivator(inputChannel = "sftpTestMessageChannel")
public MessageHandler someHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("myThirdHandler: " + message.getPayload());
System.out.println("myThirdHandler: " + message.getHeaders());
Event ev = ((Event[]) message.getPayload())[1];
System.out.println("myThirdHandler: " + ev);
throw new IllegalStateException("Want to see the next MessageHandler");
}
};
}
}
为了让ftp实现Jsch多说话一点,我有一个配置“application-junit.yaml”:
logging:
level:
org.springframework.integration: debug
### filter warning
### org.springframework.integration.expression.ExpressionUtils:
### Creating EvaluationContext with no beanFactory
org.springframework.integration.expression.ExpressionUtils: error
### filter info
### because jsch is very talkative
com.jcraft.jsch: debug
com.harry.potter: debug
com.harry.potter.filetransfer: trace
我无法让它工作。日志输出为:
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.3.0.RELEASE)
2021-08-22 23:29:27.958 INFO 9916 --- [ main] c.l.c.f.service.GetFileIntegrationTest : Starting GetFileIntegrationTest on myClient with PID 9916 (started by test in C:\Users\test\git\tis\sftp-client)
2021-08-22 23:29:27.959 DEBUG 9916 --- [ main] c.l.c.f.service.GetFileIntegrationTest : Running with Spring Boot v2.3.0.RELEASE, Spring v5.2.6.RELEASE
2021-08-22 23:29:27.959 INFO 9916 --- [ main] c.l.c.f.service.GetFileIntegrationTest : No active profile set, falling back to default profiles: default
2021-08-22 23:29:28.752 INFO 9916 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-08-22 23:29:28.759 INFO 9916 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-08-22 23:29:28.763 DEBUG 9916 --- [ main] faultConfiguringBeanFactoryPostProcessor : SpEL function '#xpath' isn't registered: there is no spring-integration-xml.jar on the classpath.
2021-08-22 23:29:28.766 INFO 9916 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-08-22 23:29:28.837 INFO 9916 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-22 23:29:28.841 INFO 9916 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-22 23:29:28.873 INFO 9916 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-22 23:29:30.298 INFO 9916 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2021-08-22 23:29:30.802 INFO 9916 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2021-08-22 23:29:30.894 DEBUG 9916 --- [ main] faultConfiguringBeanFactoryPostProcessor :
Spring Integration global properties:
spring.integration.endpoints.noAutoStartup=
spring.integration.taskScheduler.poolSize=10
spring.integration.channels.maxUnicastSubscribers=0x7fffffff
spring.integration.channels.autoCreate=true
spring.integration.channels.maxBroadcastSubscribers=0x7fffffff
spring.integration.readOnly.headers=
spring.integration.messagingTemplate.throwExceptionOnLateReply=false
2021-08-22 23:29:30.901 DEBUG 9916 --- [ main] .s.i.c.GlobalChannelInterceptorProcessor : No global channel interceptors.
2021-08-22 23:29:30.904 INFO 9916 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-22 23:29:30.904 INFO 9916 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2021-08-22 23:29:30.904 INFO 9916 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2021-08-22 23:29:30.905 INFO 9916 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-08-22 23:29:30.905 INFO 9916 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.testSftpInboundFlow.channel#0' has 1 subscriber(s).
2021-08-22 23:29:30.905 INFO 9916 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:30.905 INFO 9916 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:someService} as a subscriber to the 'sftpTestMessageChannel' channel
2021-08-22 23:29:30.905 INFO 9916 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.sftpTestMessageChannel' has 1 subscriber(s).
2021-08-22 23:29:30.905 INFO 9916 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'someService'
2021-08-22 23:29:30.910 INFO 9916 --- [ main] o.s.i.e.SourcePollingChannelAdapter : started bean 'sftpTestInboundAdapter'; defined in: 'class path resource [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:30.922 INFO 9916 --- [ main] c.l.c.f.service.GetFileIntegrationTest : Started GetFileIntegrationTest in 3.323 seconds (JVM running for 4.13)
2021-08-22 23:29:30.935 INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch : Connecting to myHost port 22
2021-08-22 23:29:30.968 INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch : Connection established
2021-08-22 23:29:31.041 INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch : Remote version string: SSH-2.0-OpenSSH_for_Windows_8.1
2021-08-22 23:29:31.042 INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch : Local version string: SSH-2.0-JSCH-0.1.54
2021-08-22 23:29:31.042 INFO 9916 --- [ask-scheduler-1] com.jcraft.jsch : CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
2021-08-22 23:29:31.216 INFO 9916 --- [ main] c.l.c.f.service.GetFileIntegrationTest : GetIntegrationTest testgetFile
2021-08-22 23:29:31.226 INFO 9916 --- [extShutdownHook] o.s.i.e.SourcePollingChannelAdapter : stopped bean 'sftpTestInboundAdapter'; defined in: 'class path resource [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {json-to-object-transformer} as a subscriber to the 'testSftpInboundFlow.channel#0' channel
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.integration.channel.DirectChannel : Channel 'application.testSftpInboundFlow.channel#0' has 0 subscriber(s).
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'testSftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/filetransfer/config/JobConfiguration.class]'; from source: 'bean method testSftpInboundFlow'
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:someService} as a subscriber to the 'sftpTestMessageChannel' channel
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.integration.channel.DirectChannel : Channel 'application.sftpTestMessageChannel' has 0 subscriber(s).
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'someService'
2021-08-22 23:29:31.227 INFO 9916 --- [extShutdownHook] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
2021-08-22 23:29:31.228 INFO 9916 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
我用 Pollers.fixedDelay(5000) 替换了 FireOnceTestTrigger,没有。
我做错了什么?
测试正在退出,JVM 在获取完成之前关闭。
您需要等待文件被提取。一种方法是在您的测试中添加一个通道拦截器。像这样:
public class GetFileIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(GetFileIntegrationTest.class);
@Autowired
AbstractMessageChannel sftpTestMessageChannel;
@Test
public void testGetFile() throws Exception {
LOG.info("GetIntegrationTest testgetFile");
CountDownLatch latch = new CountDownLatch(1);
this.sftpTestMessageChannel.addInterceptor(new ChannelInterceptor() {
// override preSend and/or postSend, capture the message and
// count down the latch.
});
assertTrue(latch.await(10, TimeUnit.SECONDS));
// assertThat(fileReceived, is(sb.toString()));
}
}