如何停止轮询 InboundChannelAdapter
How To Stop Polling InboundChannelAdapter
我使用 RotatingServerAdvice 从 1 个服务器中的 2 个不同目录轮询文件并且工作正常,问题是一旦我启动 inboundtest.start (),我就无法停止轮询。主要思路是把那些目录下的所有文件都取出来,然后发送inboundtest.stop(),代码是这样的
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(false);
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
//factory.setTestSession(true);
return factory;
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(true);
fileSynchronizer.setRemoteDirectory(sftpRemoteDirectory);
fileSynchronizer.setFilter(new SftpRegexPatternFileListFilter(".*?\.(txt|TXT?)"));
return fileSynchronizer;
}
@Bean(name = "sftpMessageSource")
@EndpointId("inboundtest")
@InboundChannelAdapter(channel = "sftpChannel",poller = @Poller("fileReadingMessageSourcePollerMetadata"), autoStartup = "false")
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File(sftpLocalDirectoryDownloadUpload));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
return source;
}
@Bean
public DelegatingSessionFactory<LsEntry> sessionFactory() {
Map<Object, SessionFactory<LsEntry>> factories = new LinkedHashMap<>();
factories.put("one", sftpSessionFactory());
// use the first SF as the default
return new DelegatingSessionFactory<LsEntry>(factories, factories.values().iterator().next());
}
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", sftpRemoteDirectory));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", sftpRemoteDirectoryNonUpload));
return new RotatingServerAdvice(sessionFactory(), keyDirectories, false);
}
@Bean
MessageChannel controlChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "controlChannel")
ExpressionControlBusFactoryBean controlBus() {
return new ExpressionControlBusFactoryBean();
}
@Bean
public PollerMetadata fileReadingMessageSourcePollerMetadata() {
PollerMetadata meta = new PollerMetadata();
meta.setTrigger(new PeriodicTrigger(1000));
meta.setAdviceChain(List.of(advice()));
meta.setMaxMessagesPerPoll(1);
meta.setErrorHandler(throwable -> new IOException());
return meta;
}
Allways 正在等待 2 个目录之一中的新文件,但这不是想法,想法是在检索到所有文件后停止轮询
从另一个 class 我通过这里的控制通道调用 inbound.start() 代码:
@Autowired
private MessageChannel controlChannel;
public void startProcessingFiles() throws InterruptedException {
controlChannel.send(new GenericMessage<>("@inboundtest.start()"));
}
我尝试停止使用这个 class 但不起作用
@Component
public class StopPollingAdvice implements ReceiveMessageAdvice {
@Autowired
private MessageChannel controlChannel;
@Override
public Message<?> afterReceive(Message<?> message, Object o) {
System.out.println("There is no more files, stopping connection" + message.getPayload());
if(message == null) {
System.out.println("There is no more files, stopping connection" + message.getPayload());
Message operation = MessageBuilder.withPayload("@inboundtest.stop()").build();
controlChannel.send(operation);
}
return message;
}
}
好的。现在我明白你的意思了。 RotatingServerAdvice
仅在第一个服务器耗尽时才会移动到其他服务器(默认情况下,请参阅 fair
选项)。所以,当你在建议中停止它时,它不能再去其他目录获取。您需要考虑其他一些停止解决方案。与建议和此 afterReceive()
无关的内容,在您流程的下游某处...
或者您可以提供自定义 RotationPolicy
(StandardRotationPolicy
的扩展名)并在其覆盖的 afterReceive()
中检查所有已处理的目录,然后发送停止命令。
我使用 RotatingServerAdvice 从 1 个服务器中的 2 个不同目录轮询文件并且工作正常,问题是一旦我启动 inboundtest.start (),我就无法停止轮询。主要思路是把那些目录下的所有文件都取出来,然后发送inboundtest.stop(),代码是这样的
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(false);
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
//factory.setTestSession(true);
return factory;
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(true);
fileSynchronizer.setRemoteDirectory(sftpRemoteDirectory);
fileSynchronizer.setFilter(new SftpRegexPatternFileListFilter(".*?\.(txt|TXT?)"));
return fileSynchronizer;
}
@Bean(name = "sftpMessageSource")
@EndpointId("inboundtest")
@InboundChannelAdapter(channel = "sftpChannel",poller = @Poller("fileReadingMessageSourcePollerMetadata"), autoStartup = "false")
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File(sftpLocalDirectoryDownloadUpload));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
return source;
}
@Bean
public DelegatingSessionFactory<LsEntry> sessionFactory() {
Map<Object, SessionFactory<LsEntry>> factories = new LinkedHashMap<>();
factories.put("one", sftpSessionFactory());
// use the first SF as the default
return new DelegatingSessionFactory<LsEntry>(factories, factories.values().iterator().next());
}
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", sftpRemoteDirectory));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", sftpRemoteDirectoryNonUpload));
return new RotatingServerAdvice(sessionFactory(), keyDirectories, false);
}
@Bean
MessageChannel controlChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "controlChannel")
ExpressionControlBusFactoryBean controlBus() {
return new ExpressionControlBusFactoryBean();
}
@Bean
public PollerMetadata fileReadingMessageSourcePollerMetadata() {
PollerMetadata meta = new PollerMetadata();
meta.setTrigger(new PeriodicTrigger(1000));
meta.setAdviceChain(List.of(advice()));
meta.setMaxMessagesPerPoll(1);
meta.setErrorHandler(throwable -> new IOException());
return meta;
}
Allways 正在等待 2 个目录之一中的新文件,但这不是想法,想法是在检索到所有文件后停止轮询
从另一个 class 我通过这里的控制通道调用 inbound.start() 代码:
@Autowired
private MessageChannel controlChannel;
public void startProcessingFiles() throws InterruptedException {
controlChannel.send(new GenericMessage<>("@inboundtest.start()"));
}
我尝试停止使用这个 class 但不起作用
@Component
public class StopPollingAdvice implements ReceiveMessageAdvice {
@Autowired
private MessageChannel controlChannel;
@Override
public Message<?> afterReceive(Message<?> message, Object o) {
System.out.println("There is no more files, stopping connection" + message.getPayload());
if(message == null) {
System.out.println("There is no more files, stopping connection" + message.getPayload());
Message operation = MessageBuilder.withPayload("@inboundtest.stop()").build();
controlChannel.send(operation);
}
return message;
}
}
好的。现在我明白你的意思了。 RotatingServerAdvice
仅在第一个服务器耗尽时才会移动到其他服务器(默认情况下,请参阅 fair
选项)。所以,当你在建议中停止它时,它不能再去其他目录获取。您需要考虑其他一些停止解决方案。与建议和此 afterReceive()
无关的内容,在您流程的下游某处...
或者您可以提供自定义 RotationPolicy
(StandardRotationPolicy
的扩展名)并在其覆盖的 afterReceive()
中检查所有已处理的目录,然后发送停止命令。