每次执行任务时如何强制 SftpInboundAdapter 连接并获取文件?
How do I force the SftpInboundAdapter to connect and get a file every time a task is executed?
这是我的场景:我每个月都会收到一个文件 my.sftp.server.de
,事件数据为 JSON。该文件始终具有完全相同的名称。我知道什么时候提供文件。
然后文件将被提取、处理、删除并使用另一个显示处理时间戳的文件名进行备份。这是通过订阅的 MessageHandler deleteLocalFileService
.
完成的
这是我的 Spring 集成 类:
@Autowired
private Executor taskExecutor;
private ConcurrentMap<String, String> metadata = new ConcurrentHashMap<>();
@Bean
public SessionFactory<LsEntry> sftpSessionFactory(TransferChannel transferChannel) {
LOG.debug("sftpSessionFactory");
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
...
return new CachingSessionFactory<LsEntry>(sf);
}
@Bean
public IntegrationFlow sftpInboundFlow(TransferContext context) {
LOG.debug("sftpInboundFlow");
return IntegrationFlows
.from(Sftp.inboundAdapter(sftpSessionFactory(context.getChannel()))
.preserveTimestamp(context.isPreserveTimestamp())
.remoteDirectory(context.getRemoteDir())
.regexFilter(context.getRemoteFilename())
.deleteRemoteFiles(context.isRemoteRemove())
.autoCreateLocalDirectory(context.isAutoCreateLocalDir())
.localFilenameExpression(context.getLocalFilename())
.localDirectory(new File(context.getLocalDir()))
.localFilter(new FileSystemPersistentAcceptOnceFileListFilter(
new SimpleMetadataStore(metadata), "")),
e -> e.id("sftpInboundAdapter")
.autoStartup(false)
.poller(Pollers.fixedDelay(15000).taskExecutor(taskExecutor))
)
.transform(Transformers.fromJson(Event[].class))
.channel("sftpInputChannel")
.get();
}
@Bean
public PublishSubscribeChannel sftpInputChannel() {
LOG.debug("sftpInputChannel");
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler handler() {
LOG.debug("MessageHandler-handler");
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
LOG.debug("handleMessage message={}", message);
}
};
}
@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler deleteLocalFileService() {
LOG.debug("MessageHandler-deleteLocalFileService");
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
File f1 = (File) message.getHeaders().get("file_originalFile");
try {
// create backup copy in another directory
// (otherwise the remote and local filters avoid fetch of the (new) file again)
File f2 = new File("/home/app/sftp-inbound-work/EventData.txt.SAVE." +
LocalDateTime.now().format(ISO_LOCAL_DATE_TIME));
FileUtil.copyFile(f1, f2);
LOG.debug("deleteLocalFileService backup={} created", f2);
// remove received file
if (!f1.delete()) {
throw new MessagingException("Could not remove local file, " + f1.getName());
};
metadata.clear();
LOG.debug("deleteLocalFileService file={} removed", f1);
}
catch (IOException io) {
LOG.error("Exception copying local file {}", io);
throw new MessagingException("Could not backup local file, " + f1.getName());
}
}
};
}
@Bean
public MessageChannel controlChannel() {
LOG.debug("controlChannel");
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "controlChannel")
public ExpressionControlBusFactoryBean controlBus() {
LOG.debug("controlBus");
ExpressionControlBusFactoryBean expressionControlBusFactoryBean = new ExpressionControlBusFactoryBean();
return expressionControlBusFactoryBean;
}
因为我知道何时从外部源提供事件数据文件,所以我有一个方法 runAsTask
作为 Spring 调度程序任务执行。该方法添加一个 ChannelInterceptor 以获取文件的内容,然后通过 ControlChannel 消息启动 Spring 集成集成,最后从接收到的文件中获取和 return 事件数据。
这是核心任务方法:getFile()
方法:
private static final int FILE_GET_TIMEOUT_SEC = 30;
@Autowired
AbstractMessageChannel sftpInputChannel;
@Autowired
MessageChannel controlChannel;
@Scheduled(cron = "${com.harry.potter.job.tasks.harry-potter-events-task.frequency}")
public void runAsTask() {
if (env.isEnabled()) {
LOG.info("runAsTask: Storing Event Data, frequency={}", env.getFrequency());
jobLOG.info("---------------------------------------------------------------");
jobLOG.info("Job started: Storing Event Data");
} else {
LOG.debug("runAsTask");
LOG.debug("runAsTask {} not enabled: Storing Event Data, aborted.", jobId);
return;
}
try {
events = getFile();
/* processing of the event data */
jobLOG.info("Okay, send okay mail");
sendOkayMail();
} catch (Exception e) {
LOG.error("{} failed! Exception: {}", jobId, e);
jobLOG.info("Job has errors, send error mail");
sendErrorMail();
} finally {
LOG.info("{} finished", jobId);
jobLOG.info("Job finished");
}
}
public List<Event> getFile() {
LOG.debug("getFile");
final List<Event> events = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
sftpInputChannel.addInterceptor(new ChannelInterceptor() {
// capture the message and count down the latch
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
LOG.debug("postSend channel={}, sent={}", channel.toString(), sent);
LOG.trace("postSend message={}", message.getPayload());
// read and transform the content
Arrays.stream((Event[]) message.getPayload()).forEach(e -> events.add(e));
// signal: work has done
latch.countDown();
ChannelInterceptor.super.postSend(message, channel, sent);
}
});
boolean sent = controlChannel.send(new GenericMessage<>("@sftpInboundAdapter.start()"));
LOG.debug("getFile 'Start' control message sent successfully={}", sent);
try {
LOG.debug("getFile, waiting for the file to be received, timeout={}s", FILE_GET_TIMEOUT_SEC);
if (latch.await(FILE_GET_TIMEOUT_SEC, TimeUnit.SECONDS)) {
return events;
}
}
catch (InterruptedException e) {
LOG.warn("getFile, job was interrupted");
}
throw new IllegalStateException("expected file not available");
}
有两个问题:
多次添加ChannelInterceptor
(见LOGpostSend channel=bean 'sftpInputChannel' ...
)。我应该在哪里添加拦截器才能只有一个?或者如何检查是否已添加拦截器?
文件是第一次从 SFTP 获取(参见日志中的 com.jcraft.jsch
行)。所有后续任务周期都不再与 SFTP 主机进行协商。为了在每个周期获取文件,我被告知对由 Map 支持的 LocalDirectoryFilter
使用 SimpleMetadataStore
并在处理文件后清除此映射,我这样做了。每次任务 运行?
时,我必须更改什么才能从 SFTP 获取文件
这里是两次启动任务的LOG:
2021-10-09 01:35:52.688 INFO [] --- [http-nio-8080-exec-220] c.h.potter.job.MyJobApplication : The following profiles are active: test,cdsclient
2021-10-09 01:35:53.960 WARN [] --- [http-nio-8080-exec-220] o.s.boot.actuate.endpoint.EndpointId : Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
2021-10-09 01:35:54.141 INFO [] --- [http-nio-8080-exec-220] o.s.cloud.context.scope.GenericScope : BeanFactory id=301a6a9f-3809-32e6-b15b-e640dd198297
2021-10-09 01:35:54.149 INFO [] --- [http-nio-8080-exec-220] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-10-09 01:35:54.155 INFO [] --- [http-nio-8080-exec-220] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-10-09 01:35:54.160 INFO [] --- [http-nio-8080-exec-220] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-10-09 01:35:54.282 INFO [] --- [http-nio-8080-exec-220] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.300 INFO [] --- [http-nio-8080-exec-220] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.301 INFO [] --- [http-nio-8080-exec-220] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.363 INFO [] --- [http-nio-8080-exec-220] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1668 ms
2021-10-09 01:35:54.967 WARN [] --- [http-nio-8080-exec-220] c.n.c.sources.URLConfigurationSource : No URLs will be polled as dynamic configuration sources.
2021-10-09 01:35:54.967 INFO [] --- [http-nio-8080-exec-220] c.n.c.sources.URLConfigurationSource : To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
2021-10-09 01:35:54.981 INFO [] --- [http-nio-8080-exec-220] c.netflix.config.DynamicPropertyFactory : DynamicPropertyFactory is initialized with configuration sources: com.netflix.config.ConcurrentCompositeConfiguration@db52778f
2021-10-09 01:35:55.158 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : sftpInputChannel
2021-10-09 01:35:55.224 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : controlChannel
2021-10-09 01:35:55.476 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : transferChannel
2021-10-09 01:35:55.477 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : transferContext
2021-10-09 01:35:55.477 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : sftpSessionFactory
2021-10-09 01:35:55.486 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : sftpInboundFlow
2021-10-09 01:35:55.587 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : MessageHandler-handler
2021-10-09 01:35:55.588 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : MessageHandler-deleteLocalFileService
2021-10-09 01:35:55.589 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : controlBus
2021-10-09 01:35:56.271 INFO [] --- [http-nio-8080-exec-220] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 1 subscriber(s).
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {json-to-object-transformer} as a subscriber to the 'sftpInboundFlow.channel#0' channel
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.integration.channel.DirectChannel : Channel 'application-1.sftpInboundFlow.channel#0' has 1 subscriber(s).
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean 'sftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:sftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean 'sftpConfiguration.handler.serviceActivator'
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:sftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 2 subscriber(s).
2021-10-09 01:35:56.336 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean 'sftpConfiguration.deleteLocalFileService.serviceActivator'
2021-10-09 01:35:56.336 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {service-activator:sftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-10-09 01:35:56.336 INFO [] --- [http-nio-8080-exec-220] o.s.integration.channel.DirectChannel : Channel 'application-1.controlChannel' has 1 subscriber(s).
2021-10-09 01:35:56.336 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean 'sftpConfiguration.controlBus.serviceActivator'
2021-10-09 01:35:56.344 INFO [] --- [http-nio-8080-exec-220] o.s.c.n.eureka.InstanceInfoFactory : Setting initial instance status as: STARTING
2021-10-09 01:35:56.380 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Initializing Eureka in region us-east-1
2021-10-09 01:35:56.449 INFO [] --- [http-nio-8080-exec-220] c.n.d.provider.DiscoveryJerseyProvider : Using JSON encoding codec LegacyJacksonJson
2021-10-09 01:35:56.449 INFO [] --- [http-nio-8080-exec-220] c.n.d.provider.DiscoveryJerseyProvider : Using JSON decoding codec LegacyJacksonJson
2021-10-09 01:35:56.550 INFO [] --- [http-nio-8080-exec-220] c.n.d.provider.DiscoveryJerseyProvider : Using XML encoding codec XStreamXml
2021-10-09 01:35:56.550 INFO [] --- [http-nio-8080-exec-220] c.n.d.provider.DiscoveryJerseyProvider : Using XML decoding codec XStreamXml
2021-10-09 01:35:56.722 INFO [] --- [http-nio-8080-exec-220] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Disable delta property : false
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Single vip registry refresh property : null
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Force full registry fetch : false
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Application is null : false
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Registered Applications size is zero : true
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Application version is -1: true
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Getting all instance registry info from the eureka server
2021-10-09 01:35:56.953 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : The response status is 200
2021-10-09 01:35:56.955 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Not registering with Eureka server per configuration
2021-10-09 01:35:56.958 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Discovery Client initialized at timestamp 1633736156957 with initial instances count: 69
2021-10-09 01:35:56.959 INFO [] --- [http-nio-8080-exec-220] o.s.c.n.e.s.EurekaServiceRegistry : Registering application MyJob with eureka with status UP
2021-10-09 01:35:56.986 INFO [] --- [http-nio-8080-exec-220] c.h.potter.job.MyJobApplication : Started MyJobApplication in 5.36 seconds (JVM running for 996008.867)
2021-10-09 01:36:00.000 INFO [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : runAsTask: Storing Event Data, frequency=0/30 * * * * *
2021-10-09 01:36:00.001 INFO [] --- [pool-240-thread-1] job-MyJob : ---------------------------------------------------------------
2021-10-09 01:36:00.001 INFO [] --- [pool-240-thread-1] job-MyJob : Job started: Storing Event Data
2021-10-09 01:36:00.001 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile
2021-10-09 01:36:00.031 INFO [] --- [pool-240-thread-1] o.s.i.e.SourcePollingChannelAdapter : started bean 'sftpInboundAdapter'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:36:00.033 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile 'Start' control message sent successfully=true
2021-10-09 01:36:00.033 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile, waiting for the file to be received, timeout=60s
2021-10-09 01:36:00.055 INFO [] --- [pool-240-thread-2] com.jcraft.jsch : Connecting to my.sftp.server.de port 22
2021-10-09 01:36:00.057 INFO [] --- [pool-240-thread-2] com.jcraft.jsch : Connection established
...
2021-10-09 01:36:00.158 INFO [] --- [pool-240-thread-2] o.s.i.s.s.DefaultSftpSessionFactory : The authenticity of host 'my.sftp.server.de' can't be established.
RSA key fingerprint is 4d:fe:f9:35:08:20:2e:76:76:55:7a:1d:5d:5d:1c:90.
Are you sure you want to continue connecting?
2021-10-09 01:36:00.158 WARN [] --- [pool-240-thread-2] com.jcraft.jsch : Permanently added 'my.sftp.server.de' (RSA) to the list of known hosts.
...
2021-10-09 01:36:00.385 INFO [] --- [pool-240-thread-2] com.jcraft.jsch : Authentication succeeded (publickey).
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2] c.l.c.c.c.sftpConfiguration : handleMessage message=GenericMessage [payload=[Lcom.harry.potter.job.miscellaneous.Event;@24b4484b, headers={file_remoteHostPort=my.sftp.server.de:22, file_name=EventData.txt, file_remoteDirectory=/data, file_originalFile=/home/app/sftp-inbound-work/EventData.txt, id=fa7b1773-d522-9f0f-e8ed-8fcf3bebe23a, file_relativePath=EventData.txt, file_remoteFile=EventData.txt, timestamp=1633736160447}]
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2] c.l.c.c.c.sftpConfiguration : deleteLocalFileService file=/home/app/sftp-inbound-work/EventData.txt removed
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2] c.l.c.job.MyJobTask : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:00.448 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : createEvents
2021-10-09 01:36:01.384 INFO [] --- [pool-240-thread-1] job-MyJob : Okay, send okay mail
2021-10-09 01:36:01.384 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : sendOkayMail
2021-10-09 01:36:01.512 INFO [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : MyJob finished
2021-10-09 01:36:01.512 INFO [] --- [pool-240-thread-1] job-MyJob : Job finished
2021-10-09 01:36:30.001 INFO [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : runAsTask: Storing Event Data, frequency=0/30 * * * * *
2021-10-09 01:36:30.001 INFO [] --- [pool-240-thread-1] job-MyJob : ---------------------------------------------------------------
2021-10-09 01:36:30.001 INFO [] --- [pool-240-thread-1] job-MyJob : Job started: Storing Event Data
2021-10-09 01:36:30.001 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile
2021-10-09 01:36:30.002 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile 'Start' control message sent successfully=true
2021-10-09 01:36:30.002 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile, waiting for the file to be received, timeout=60s
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3] c.l.c.c.c.sftpConfiguration : handleMessage message=GenericMessage [payload=[Lcom.harry.potter.job.miscellaneous.Event;@b2cd829f, headers={file_remoteHostPort=my.sftp.server.de:22, file_name=EventData.txt, file_remoteDirectory=/data, file_originalFile=/home/app/sftp-inbound-work/EventData.txt, id=e6558e48-db84-fd2f-2195-c43d3e245127, file_relativePath=EventData.txt, file_remoteFile=EventData.txt, timestamp=1633736190062}]
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3] c.l.c.c.c.sftpConfiguration : deleteLocalFileService file=/home/app/sftp-inbound-work/EventData.txt removed
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3] c.l.c.job.MyJobTask : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3] c.l.c.job.MyJobTask : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : createEvents
2021-10-09 01:36:30.394 INFO [] --- [pool-240-thread-1] job-MyJob : Okay, send okay mail
2021-10-09 01:36:30.395 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : sendOkayMail
2021-10-09 01:36:30.457 INFO [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : MyJob finished
2021-10-09 01:36:30.457 INFO [] --- [pool-240-thread-1] job-MyJob : Job finished
...
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.e.SourcePollingChannelAdapter : stopped bean 'sftpInboundAdapter'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {json-to-object-transformer} as a subscriber to the 'sftpInboundFlow.channel#0' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.integration.channel.DirectChannel : Channel 'application-1.sftpInboundFlow.channel#0' has 0 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'sftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:sftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'sftpConfiguration.handler.serviceActivator'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:sftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'sftpConfiguration.deleteLocalFileService.serviceActivator'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {service-activator:sftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.integration.channel.DirectChannel : Channel 'application-1.controlChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'sftpConfiguration.controlBus.serviceActivator'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
因为你需要一个倒计时锁存器,你每次都需要一个新的拦截器(或者每次都使用一个带有新锁存器的AtomicReference
)。你可以打电话给removeInterceptor
.
还有一个远程过滤器。
但是,对于这种情况,使用带 GET 命令的出站网关比使用异步入站通道适配器要容易得多。
编辑
示例:
@SpringBootApplication
@EnableScheduling
public class So69511643Application {
public static void main(String[] args) {
SpringApplication.run(So69511643Application.class, args);
}
@Bean
DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
@Value("${pw}") String pw) {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(host);
sf.setUser(user);
sf.setPassword(pw);
sf.setAllowUnknownKeys(true);
return sf;
}
@Bean
IntegrationFlow flow(DefaultSftpSessionFactory sf) {
return IntegrationFlows.from(Gate.class)
.handle(Sftp.outboundGateway(sf, Command.GET, "payload")
.localDirectoryExpression("'/tmp'"))
.transform(new FileToStringTransformer())
.transform(Transformers.fromJson(Foo.class))
.get();
}
}
interface Gate {
Foo getFoo(String filename);
}
@Component
@DependsOn("flow")
class Scheduler {
private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);
@Autowired
Gate gate;
@Scheduled(cron = "0 * * * * *")
public void sched() {
log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
new File("/tmp/bar.json").delete();
}
}
EDIT2
使用 .route
获取和删除的示例:
@SpringBootApplication
@EnableScheduling
public class So69511643Application {
public static void main(String[] args) {
SpringApplication.run(So69511643Application.class, args);
}
@Bean
DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
@Value("${pw}") String pw, @Value("${pk}") Resource pk) {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(host);
sf.setUser(user);
// sf.setPassword(pw);
sf.setPrivateKey(pk);
sf.setAllowUnknownKeys(true);
return sf;
}
@Bean
IntegrationFlow flow(DefaultSftpSessionFactory sf) {
return f -> f
.log()
.route(Message.class, m -> m.getHeaders().get("method", String.class), r -> r
.subFlowMapping("getFoo", f1 -> f1
.handle(Sftp.outboundGateway(sf, Command.GET, "payload")
.localDirectoryExpression("'/tmp'"))
.transform(new FileToStringTransformer())
.transform(Transformers.fromJson(Foo.class)))
.subFlowMapping("remove", f2 -> f2
.handle(Sftp.outboundGateway(sf, Command.RM, "payload"))));
}
}
@MessagingGateway(defaultRequestChannel = "flow.input",
defaultHeaders = @GatewayHeader(name = "method", expression = "#gatewayMethod.name"))
interface Gate {
Foo getFoo(String filename);
boolean remove(String filename);
}
@Component
@DependsOn("flow")
class Scheduler {
private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);
@Autowired
Gate gate;
@Scheduled(cron = "0 * * * * *")
public void sched() {
log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
new File("/tmp/bar.json").delete();
log.info("Deleted {}", this.gate.remove("foo/bar.json"));
}
}
EDIT3
以上示例仅使用注释定义了网关代理 - 您不能在注释和 DSL 之间混合配置。下面显示了仅使用 DSL 的网关代理配置的相同内容。
@SpringBootApplication
@EnableScheduling
public class So69511643Application {
public static void main(String[] args) {
SpringApplication.run(So69511643Application.class, args);
}
@Bean
DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
@Value("${pw}") String pw, @Value("${pk}") Resource pk) {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(host);
sf.setUser(user);
// sf.setPassword(pw);
sf.setPrivateKey(pk);
sf.setAllowUnknownKeys(true);
return sf;
}
@Bean
IntegrationFlow flow(DefaultSftpSessionFactory sf) {
return IntegrationFlows.from(Gate.class, g -> g
.header("method", args -> args.getMethod().getName()))
.log()
.route(Message.class, m -> m.getHeaders().get("method", String.class), r -> r
.subFlowMapping("getFoo", f1 -> f1
.handle(Sftp.outboundGateway(sf, Command.GET, "payload")
.localDirectoryExpression("'/tmp'"))
.transform(new FileToStringTransformer())
.transform(Transformers.fromJson(Foo.class)))
.subFlowMapping("remove", f2 -> f2
.handle(Sftp.outboundGateway(sf, Command.RM, "payload"))))
.get();
}
}
interface Gate {
Foo getFoo(String filename);
boolean remove(String filename);
}
@Component
@DependsOn("flow")
class Scheduler {
private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);
@Autowired
Gate gate;
@Scheduled(cron = "0 * * * * *")
public void sched() {
log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
new File("/tmp/bar.json").delete();
log.info("Deleted {}", this.gate.remove("foo/bar.json"));
}
}
这是我的场景:我每个月都会收到一个文件 my.sftp.server.de
,事件数据为 JSON。该文件始终具有完全相同的名称。我知道什么时候提供文件。
然后文件将被提取、处理、删除并使用另一个显示处理时间戳的文件名进行备份。这是通过订阅的 MessageHandler deleteLocalFileService
.
这是我的 Spring 集成 类:
@Autowired
private Executor taskExecutor;
private ConcurrentMap<String, String> metadata = new ConcurrentHashMap<>();
@Bean
public SessionFactory<LsEntry> sftpSessionFactory(TransferChannel transferChannel) {
LOG.debug("sftpSessionFactory");
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
...
return new CachingSessionFactory<LsEntry>(sf);
}
@Bean
public IntegrationFlow sftpInboundFlow(TransferContext context) {
LOG.debug("sftpInboundFlow");
return IntegrationFlows
.from(Sftp.inboundAdapter(sftpSessionFactory(context.getChannel()))
.preserveTimestamp(context.isPreserveTimestamp())
.remoteDirectory(context.getRemoteDir())
.regexFilter(context.getRemoteFilename())
.deleteRemoteFiles(context.isRemoteRemove())
.autoCreateLocalDirectory(context.isAutoCreateLocalDir())
.localFilenameExpression(context.getLocalFilename())
.localDirectory(new File(context.getLocalDir()))
.localFilter(new FileSystemPersistentAcceptOnceFileListFilter(
new SimpleMetadataStore(metadata), "")),
e -> e.id("sftpInboundAdapter")
.autoStartup(false)
.poller(Pollers.fixedDelay(15000).taskExecutor(taskExecutor))
)
.transform(Transformers.fromJson(Event[].class))
.channel("sftpInputChannel")
.get();
}
@Bean
public PublishSubscribeChannel sftpInputChannel() {
LOG.debug("sftpInputChannel");
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler handler() {
LOG.debug("MessageHandler-handler");
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
LOG.debug("handleMessage message={}", message);
}
};
}
@Bean
@ServiceActivator(inputChannel = "sftpInputChannel")
public MessageHandler deleteLocalFileService() {
LOG.debug("MessageHandler-deleteLocalFileService");
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
File f1 = (File) message.getHeaders().get("file_originalFile");
try {
// create backup copy in another directory
// (otherwise the remote and local filters avoid fetch of the (new) file again)
File f2 = new File("/home/app/sftp-inbound-work/EventData.txt.SAVE." +
LocalDateTime.now().format(ISO_LOCAL_DATE_TIME));
FileUtil.copyFile(f1, f2);
LOG.debug("deleteLocalFileService backup={} created", f2);
// remove received file
if (!f1.delete()) {
throw new MessagingException("Could not remove local file, " + f1.getName());
};
metadata.clear();
LOG.debug("deleteLocalFileService file={} removed", f1);
}
catch (IOException io) {
LOG.error("Exception copying local file {}", io);
throw new MessagingException("Could not backup local file, " + f1.getName());
}
}
};
}
@Bean
public MessageChannel controlChannel() {
LOG.debug("controlChannel");
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "controlChannel")
public ExpressionControlBusFactoryBean controlBus() {
LOG.debug("controlBus");
ExpressionControlBusFactoryBean expressionControlBusFactoryBean = new ExpressionControlBusFactoryBean();
return expressionControlBusFactoryBean;
}
因为我知道何时从外部源提供事件数据文件,所以我有一个方法 runAsTask
作为 Spring 调度程序任务执行。该方法添加一个 ChannelInterceptor 以获取文件的内容,然后通过 ControlChannel 消息启动 Spring 集成集成,最后从接收到的文件中获取和 return 事件数据。
这是核心任务方法:getFile()
方法:
private static final int FILE_GET_TIMEOUT_SEC = 30;
@Autowired
AbstractMessageChannel sftpInputChannel;
@Autowired
MessageChannel controlChannel;
@Scheduled(cron = "${com.harry.potter.job.tasks.harry-potter-events-task.frequency}")
public void runAsTask() {
if (env.isEnabled()) {
LOG.info("runAsTask: Storing Event Data, frequency={}", env.getFrequency());
jobLOG.info("---------------------------------------------------------------");
jobLOG.info("Job started: Storing Event Data");
} else {
LOG.debug("runAsTask");
LOG.debug("runAsTask {} not enabled: Storing Event Data, aborted.", jobId);
return;
}
try {
events = getFile();
/* processing of the event data */
jobLOG.info("Okay, send okay mail");
sendOkayMail();
} catch (Exception e) {
LOG.error("{} failed! Exception: {}", jobId, e);
jobLOG.info("Job has errors, send error mail");
sendErrorMail();
} finally {
LOG.info("{} finished", jobId);
jobLOG.info("Job finished");
}
}
public List<Event> getFile() {
LOG.debug("getFile");
final List<Event> events = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
sftpInputChannel.addInterceptor(new ChannelInterceptor() {
// capture the message and count down the latch
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
LOG.debug("postSend channel={}, sent={}", channel.toString(), sent);
LOG.trace("postSend message={}", message.getPayload());
// read and transform the content
Arrays.stream((Event[]) message.getPayload()).forEach(e -> events.add(e));
// signal: work has done
latch.countDown();
ChannelInterceptor.super.postSend(message, channel, sent);
}
});
boolean sent = controlChannel.send(new GenericMessage<>("@sftpInboundAdapter.start()"));
LOG.debug("getFile 'Start' control message sent successfully={}", sent);
try {
LOG.debug("getFile, waiting for the file to be received, timeout={}s", FILE_GET_TIMEOUT_SEC);
if (latch.await(FILE_GET_TIMEOUT_SEC, TimeUnit.SECONDS)) {
return events;
}
}
catch (InterruptedException e) {
LOG.warn("getFile, job was interrupted");
}
throw new IllegalStateException("expected file not available");
}
有两个问题:
多次添加
ChannelInterceptor
(见LOGpostSend channel=bean 'sftpInputChannel' ...
)。我应该在哪里添加拦截器才能只有一个?或者如何检查是否已添加拦截器?文件是第一次从 SFTP 获取(参见日志中的
时,我必须更改什么才能从 SFTP 获取文件com.jcraft.jsch
行)。所有后续任务周期都不再与 SFTP 主机进行协商。为了在每个周期获取文件,我被告知对由 Map 支持的LocalDirectoryFilter
使用SimpleMetadataStore
并在处理文件后清除此映射,我这样做了。每次任务 运行?
这里是两次启动任务的LOG:
2021-10-09 01:35:52.688 INFO [] --- [http-nio-8080-exec-220] c.h.potter.job.MyJobApplication : The following profiles are active: test,cdsclient
2021-10-09 01:35:53.960 WARN [] --- [http-nio-8080-exec-220] o.s.boot.actuate.endpoint.EndpointId : Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
2021-10-09 01:35:54.141 INFO [] --- [http-nio-8080-exec-220] o.s.cloud.context.scope.GenericScope : BeanFactory id=301a6a9f-3809-32e6-b15b-e640dd198297
2021-10-09 01:35:54.149 INFO [] --- [http-nio-8080-exec-220] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-10-09 01:35:54.155 INFO [] --- [http-nio-8080-exec-220] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-10-09 01:35:54.160 INFO [] --- [http-nio-8080-exec-220] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-10-09 01:35:54.282 INFO [] --- [http-nio-8080-exec-220] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.300 INFO [] --- [http-nio-8080-exec-220] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.301 INFO [] --- [http-nio-8080-exec-220] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-10-09 01:35:54.363 INFO [] --- [http-nio-8080-exec-220] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1668 ms
2021-10-09 01:35:54.967 WARN [] --- [http-nio-8080-exec-220] c.n.c.sources.URLConfigurationSource : No URLs will be polled as dynamic configuration sources.
2021-10-09 01:35:54.967 INFO [] --- [http-nio-8080-exec-220] c.n.c.sources.URLConfigurationSource : To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
2021-10-09 01:35:54.981 INFO [] --- [http-nio-8080-exec-220] c.netflix.config.DynamicPropertyFactory : DynamicPropertyFactory is initialized with configuration sources: com.netflix.config.ConcurrentCompositeConfiguration@db52778f
2021-10-09 01:35:55.158 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : sftpInputChannel
2021-10-09 01:35:55.224 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : controlChannel
2021-10-09 01:35:55.476 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : transferChannel
2021-10-09 01:35:55.477 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : transferContext
2021-10-09 01:35:55.477 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : sftpSessionFactory
2021-10-09 01:35:55.486 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : sftpInboundFlow
2021-10-09 01:35:55.587 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : MessageHandler-handler
2021-10-09 01:35:55.588 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : MessageHandler-deleteLocalFileService
2021-10-09 01:35:55.589 DEBUG [] --- [http-nio-8080-exec-220] c.l.c.c.c.sftpConfiguration : controlBus
2021-10-09 01:35:56.271 INFO [] --- [http-nio-8080-exec-220] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 1 subscriber(s).
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {json-to-object-transformer} as a subscriber to the 'sftpInboundFlow.channel#0' channel
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.integration.channel.DirectChannel : Channel 'application-1.sftpInboundFlow.channel#0' has 1 subscriber(s).
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean 'sftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:sftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean 'sftpConfiguration.handler.serviceActivator'
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {message-handler:sftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:35:56.335 INFO [] --- [http-nio-8080-exec-220] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 2 subscriber(s).
2021-10-09 01:35:56.336 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean 'sftpConfiguration.deleteLocalFileService.serviceActivator'
2021-10-09 01:35:56.336 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : Adding {service-activator:sftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-10-09 01:35:56.336 INFO [] --- [http-nio-8080-exec-220] o.s.integration.channel.DirectChannel : Channel 'application-1.controlChannel' has 1 subscriber(s).
2021-10-09 01:35:56.336 INFO [] --- [http-nio-8080-exec-220] o.s.i.endpoint.EventDrivenConsumer : started bean 'sftpConfiguration.controlBus.serviceActivator'
2021-10-09 01:35:56.344 INFO [] --- [http-nio-8080-exec-220] o.s.c.n.eureka.InstanceInfoFactory : Setting initial instance status as: STARTING
2021-10-09 01:35:56.380 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Initializing Eureka in region us-east-1
2021-10-09 01:35:56.449 INFO [] --- [http-nio-8080-exec-220] c.n.d.provider.DiscoveryJerseyProvider : Using JSON encoding codec LegacyJacksonJson
2021-10-09 01:35:56.449 INFO [] --- [http-nio-8080-exec-220] c.n.d.provider.DiscoveryJerseyProvider : Using JSON decoding codec LegacyJacksonJson
2021-10-09 01:35:56.550 INFO [] --- [http-nio-8080-exec-220] c.n.d.provider.DiscoveryJerseyProvider : Using XML encoding codec XStreamXml
2021-10-09 01:35:56.550 INFO [] --- [http-nio-8080-exec-220] c.n.d.provider.DiscoveryJerseyProvider : Using XML decoding codec XStreamXml
2021-10-09 01:35:56.722 INFO [] --- [http-nio-8080-exec-220] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Disable delta property : false
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Single vip registry refresh property : null
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Force full registry fetch : false
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Application is null : false
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Registered Applications size is zero : true
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Application version is -1: true
2021-10-09 01:35:56.743 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Getting all instance registry info from the eureka server
2021-10-09 01:35:56.953 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : The response status is 200
2021-10-09 01:35:56.955 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Not registering with Eureka server per configuration
2021-10-09 01:35:56.958 INFO [] --- [http-nio-8080-exec-220] com.netflix.discovery.DiscoveryClient : Discovery Client initialized at timestamp 1633736156957 with initial instances count: 69
2021-10-09 01:35:56.959 INFO [] --- [http-nio-8080-exec-220] o.s.c.n.e.s.EurekaServiceRegistry : Registering application MyJob with eureka with status UP
2021-10-09 01:35:56.986 INFO [] --- [http-nio-8080-exec-220] c.h.potter.job.MyJobApplication : Started MyJobApplication in 5.36 seconds (JVM running for 996008.867)
2021-10-09 01:36:00.000 INFO [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : runAsTask: Storing Event Data, frequency=0/30 * * * * *
2021-10-09 01:36:00.001 INFO [] --- [pool-240-thread-1] job-MyJob : ---------------------------------------------------------------
2021-10-09 01:36:00.001 INFO [] --- [pool-240-thread-1] job-MyJob : Job started: Storing Event Data
2021-10-09 01:36:00.001 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile
2021-10-09 01:36:00.031 INFO [] --- [pool-240-thread-1] o.s.i.e.SourcePollingChannelAdapter : started bean 'sftpInboundAdapter'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:36:00.033 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile 'Start' control message sent successfully=true
2021-10-09 01:36:00.033 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile, waiting for the file to be received, timeout=60s
2021-10-09 01:36:00.055 INFO [] --- [pool-240-thread-2] com.jcraft.jsch : Connecting to my.sftp.server.de port 22
2021-10-09 01:36:00.057 INFO [] --- [pool-240-thread-2] com.jcraft.jsch : Connection established
...
2021-10-09 01:36:00.158 INFO [] --- [pool-240-thread-2] o.s.i.s.s.DefaultSftpSessionFactory : The authenticity of host 'my.sftp.server.de' can't be established.
RSA key fingerprint is 4d:fe:f9:35:08:20:2e:76:76:55:7a:1d:5d:5d:1c:90.
Are you sure you want to continue connecting?
2021-10-09 01:36:00.158 WARN [] --- [pool-240-thread-2] com.jcraft.jsch : Permanently added 'my.sftp.server.de' (RSA) to the list of known hosts.
...
2021-10-09 01:36:00.385 INFO [] --- [pool-240-thread-2] com.jcraft.jsch : Authentication succeeded (publickey).
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2] c.l.c.c.c.sftpConfiguration : handleMessage message=GenericMessage [payload=[Lcom.harry.potter.job.miscellaneous.Event;@24b4484b, headers={file_remoteHostPort=my.sftp.server.de:22, file_name=EventData.txt, file_remoteDirectory=/data, file_originalFile=/home/app/sftp-inbound-work/EventData.txt, id=fa7b1773-d522-9f0f-e8ed-8fcf3bebe23a, file_relativePath=EventData.txt, file_remoteFile=EventData.txt, timestamp=1633736160447}]
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2] c.l.c.c.c.sftpConfiguration : deleteLocalFileService file=/home/app/sftp-inbound-work/EventData.txt removed
2021-10-09 01:36:00.447 DEBUG [] --- [pool-240-thread-2] c.l.c.job.MyJobTask : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:00.448 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : createEvents
2021-10-09 01:36:01.384 INFO [] --- [pool-240-thread-1] job-MyJob : Okay, send okay mail
2021-10-09 01:36:01.384 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : sendOkayMail
2021-10-09 01:36:01.512 INFO [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : MyJob finished
2021-10-09 01:36:01.512 INFO [] --- [pool-240-thread-1] job-MyJob : Job finished
2021-10-09 01:36:30.001 INFO [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : runAsTask: Storing Event Data, frequency=0/30 * * * * *
2021-10-09 01:36:30.001 INFO [] --- [pool-240-thread-1] job-MyJob : ---------------------------------------------------------------
2021-10-09 01:36:30.001 INFO [] --- [pool-240-thread-1] job-MyJob : Job started: Storing Event Data
2021-10-09 01:36:30.001 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile
2021-10-09 01:36:30.002 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile 'Start' control message sent successfully=true
2021-10-09 01:36:30.002 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : getFile, waiting for the file to be received, timeout=60s
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3] c.l.c.c.c.sftpConfiguration : handleMessage message=GenericMessage [payload=[Lcom.harry.potter.job.miscellaneous.Event;@b2cd829f, headers={file_remoteHostPort=my.sftp.server.de:22, file_name=EventData.txt, file_remoteDirectory=/data, file_originalFile=/home/app/sftp-inbound-work/EventData.txt, id=e6558e48-db84-fd2f-2195-c43d3e245127, file_relativePath=EventData.txt, file_remoteFile=EventData.txt, timestamp=1633736190062}]
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3] c.l.c.c.c.sftpConfiguration : deleteLocalFileService file=/home/app/sftp-inbound-work/EventData.txt removed
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3] c.l.c.job.MyJobTask : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-3] c.l.c.job.MyJobTask : postSend channel=bean 'sftpInputChannel'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1de2b65c', sent=true
2021-10-09 01:36:30.062 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : createEvents
2021-10-09 01:36:30.394 INFO [] --- [pool-240-thread-1] job-MyJob : Okay, send okay mail
2021-10-09 01:36:30.395 DEBUG [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : sendOkayMail
2021-10-09 01:36:30.457 INFO [] --- [pool-240-thread-1] c.l.c.job.MyJobTask : MyJob finished
2021-10-09 01:36:30.457 INFO [] --- [pool-240-thread-1] job-MyJob : Job finished
...
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.e.SourcePollingChannelAdapter : stopped bean 'sftpInboundAdapter'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {json-to-object-transformer} as a subscriber to the 'sftpInboundFlow.channel#0' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.integration.channel.DirectChannel : Channel 'application-1.sftpInboundFlow.channel#0' has 0 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'sftpInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/harry/potter/job/config/sftpConfiguration.class]'; from source: 'bean method sftpInboundFlow'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:sftpConfiguration.handler.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 1 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'sftpConfiguration.handler.serviceActivator'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {message-handler:sftpConfiguration.deleteLocalFileService.serviceActivator} as a subscriber to the 'sftpInputChannel' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.sftpInputChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'sftpConfiguration.deleteLocalFileService.serviceActivator'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : Removing {service-activator:sftpConfiguration.controlBus.serviceActivator} as a subscriber to the 'controlChannel' channel
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.integration.channel.DirectChannel : Channel 'application-1.controlChannel' has 0 subscriber(s).
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.i.endpoint.EventDrivenConsumer : stopped bean 'sftpConfiguration.controlBus.serviceActivator'
2021-10-09 01:38:43.697 INFO [] --- [Catalina-utility-2] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
因为你需要一个倒计时锁存器,你每次都需要一个新的拦截器(或者每次都使用一个带有新锁存器的
AtomicReference
)。你可以打电话给removeInterceptor
.还有一个远程过滤器。
但是,对于这种情况,使用带 GET 命令的出站网关比使用异步入站通道适配器要容易得多。
编辑
示例:
@SpringBootApplication
@EnableScheduling
public class So69511643Application {
public static void main(String[] args) {
SpringApplication.run(So69511643Application.class, args);
}
@Bean
DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
@Value("${pw}") String pw) {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(host);
sf.setUser(user);
sf.setPassword(pw);
sf.setAllowUnknownKeys(true);
return sf;
}
@Bean
IntegrationFlow flow(DefaultSftpSessionFactory sf) {
return IntegrationFlows.from(Gate.class)
.handle(Sftp.outboundGateway(sf, Command.GET, "payload")
.localDirectoryExpression("'/tmp'"))
.transform(new FileToStringTransformer())
.transform(Transformers.fromJson(Foo.class))
.get();
}
}
interface Gate {
Foo getFoo(String filename);
}
@Component
@DependsOn("flow")
class Scheduler {
private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);
@Autowired
Gate gate;
@Scheduled(cron = "0 * * * * *")
public void sched() {
log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
new File("/tmp/bar.json").delete();
}
}
EDIT2
使用 .route
获取和删除的示例:
@SpringBootApplication
@EnableScheduling
public class So69511643Application {
public static void main(String[] args) {
SpringApplication.run(So69511643Application.class, args);
}
@Bean
DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
@Value("${pw}") String pw, @Value("${pk}") Resource pk) {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(host);
sf.setUser(user);
// sf.setPassword(pw);
sf.setPrivateKey(pk);
sf.setAllowUnknownKeys(true);
return sf;
}
@Bean
IntegrationFlow flow(DefaultSftpSessionFactory sf) {
return f -> f
.log()
.route(Message.class, m -> m.getHeaders().get("method", String.class), r -> r
.subFlowMapping("getFoo", f1 -> f1
.handle(Sftp.outboundGateway(sf, Command.GET, "payload")
.localDirectoryExpression("'/tmp'"))
.transform(new FileToStringTransformer())
.transform(Transformers.fromJson(Foo.class)))
.subFlowMapping("remove", f2 -> f2
.handle(Sftp.outboundGateway(sf, Command.RM, "payload"))));
}
}
@MessagingGateway(defaultRequestChannel = "flow.input",
defaultHeaders = @GatewayHeader(name = "method", expression = "#gatewayMethod.name"))
interface Gate {
Foo getFoo(String filename);
boolean remove(String filename);
}
@Component
@DependsOn("flow")
class Scheduler {
private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);
@Autowired
Gate gate;
@Scheduled(cron = "0 * * * * *")
public void sched() {
log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
new File("/tmp/bar.json").delete();
log.info("Deleted {}", this.gate.remove("foo/bar.json"));
}
}
EDIT3
以上示例仅使用注释定义了网关代理 - 您不能在注释和 DSL 之间混合配置。下面显示了仅使用 DSL 的网关代理配置的相同内容。
@SpringBootApplication
@EnableScheduling
public class So69511643Application {
public static void main(String[] args) {
SpringApplication.run(So69511643Application.class, args);
}
@Bean
DefaultSftpSessionFactory sf(@Value("${host}") String host, @Value("${username}") String user,
@Value("${pw}") String pw, @Value("${pk}") Resource pk) {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost(host);
sf.setUser(user);
// sf.setPassword(pw);
sf.setPrivateKey(pk);
sf.setAllowUnknownKeys(true);
return sf;
}
@Bean
IntegrationFlow flow(DefaultSftpSessionFactory sf) {
return IntegrationFlows.from(Gate.class, g -> g
.header("method", args -> args.getMethod().getName()))
.log()
.route(Message.class, m -> m.getHeaders().get("method", String.class), r -> r
.subFlowMapping("getFoo", f1 -> f1
.handle(Sftp.outboundGateway(sf, Command.GET, "payload")
.localDirectoryExpression("'/tmp'"))
.transform(new FileToStringTransformer())
.transform(Transformers.fromJson(Foo.class)))
.subFlowMapping("remove", f2 -> f2
.handle(Sftp.outboundGateway(sf, Command.RM, "payload"))))
.get();
}
}
interface Gate {
Foo getFoo(String filename);
boolean remove(String filename);
}
@Component
@DependsOn("flow")
class Scheduler {
private static final Logger log = LoggerFactory.getLogger(So69511643Application.class);
@Autowired
Gate gate;
@Scheduled(cron = "0 * * * * *")
public void sched() {
log.info("Fetched: {}", this.gate.getFoo("foo/bar.json"));
new File("/tmp/bar.json").delete();
log.info("Deleted {}", this.gate.remove("foo/bar.json"));
}
}