如何停止轮询 InboundChannelAdapter

How To Stop Polling InboundChannelAdapter

我使用 RotatingServerAdvice 从 1 个服务器中的 2 个不同目录轮询文件并且工作正常,问题是一旦我启动 inboundtest.start (),我就无法停止轮询。主要思路是把那些目录下的所有文件都取出来,然后发送inboundtest.stop(),代码是这样的

    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(false);
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPassword(password);
        factory.setAllowUnknownKeys(true);
        //factory.setTestSession(true);
        return factory;
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(true);
        fileSynchronizer.setRemoteDirectory(sftpRemoteDirectory);
        fileSynchronizer.setFilter(new SftpRegexPatternFileListFilter(".*?\.(txt|TXT?)"));
        return fileSynchronizer;
    }


    @Bean(name = "sftpMessageSource")
    @EndpointId("inboundtest")
    @InboundChannelAdapter(channel = "sftpChannel",poller = @Poller("fileReadingMessageSourcePollerMetadata"), autoStartup = "false")
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File(sftpLocalDirectoryDownloadUpload));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        return source;
    }

    @Bean
        public DelegatingSessionFactory<LsEntry> sessionFactory() {
            Map<Object, SessionFactory<LsEntry>> factories = new LinkedHashMap<>();
                factories.put("one", sftpSessionFactory());
            // use the first SF as the default
            return new DelegatingSessionFactory<LsEntry>(factories, factories.values().iterator().next());
        }

    @Bean
    public RotatingServerAdvice advice() {
        List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
        keyDirectories.add(new RotationPolicy.KeyDirectory("one", sftpRemoteDirectory));
        keyDirectories.add(new RotationPolicy.KeyDirectory("one", sftpRemoteDirectoryNonUpload));
        return new RotatingServerAdvice(sessionFactory(), keyDirectories, false);
    }

    @Bean
    MessageChannel controlChannel() {
        return new DirectChannel();
    } 
 @Bean
    @ServiceActivator(inputChannel = "controlChannel")
    ExpressionControlBusFactoryBean controlBus() {
        return new ExpressionControlBusFactoryBean();
    } 

    @Bean
    public PollerMetadata fileReadingMessageSourcePollerMetadata() {
        PollerMetadata meta = new PollerMetadata();
        meta.setTrigger(new PeriodicTrigger(1000));
        meta.setAdviceChain(List.of(advice()));
        meta.setMaxMessagesPerPoll(1);
        meta.setErrorHandler(throwable -> new IOException());
        return meta;
    }

Allways 正在等待 2 个目录之一中的新文件,但这不是想法,想法是在检索到所有文件后停止轮询

从另一个 class 我通过这里的控制通道调用 inbound.start() 代码:

@Autowired
private MessageChannel controlChannel;

public void startProcessingFiles() throws InterruptedException {
    controlChannel.send(new GenericMessage<>("@inboundtest.start()"));
    
}

我尝试停止使用这个 class 但不起作用

 @Component
public class StopPollingAdvice implements ReceiveMessageAdvice {

@Autowired
private MessageChannel controlChannel;



@Override
public Message<?> afterReceive(Message<?> message, Object o) {
    System.out.println("There is no more files, stopping connection" + message.getPayload());
    if(message == null) {
        System.out.println("There is no more files, stopping connection" + message.getPayload());
        Message operation = MessageBuilder.withPayload("@inboundtest.stop()").build();
       
        controlChannel.send(operation);
      
    }
    return message;
}
}

好的。现在我明白你的意思了。 RotatingServerAdvice 仅在第一个服务器耗尽时才会移动到其他服务器(默认情况下,请参阅 fair 选项)。所以,当你在建议中停止它时,它不能再去其他目录获取。您需要考虑其他一些停止解决方案。与建议和此 afterReceive() 无关的内容,在您流程的下游某处...

或者您可以提供自定义 RotationPolicyStandardRotationPolicy 的扩展名)并在其覆盖的 afterReceive() 中检查所有已处理的目录,然后发送停止命令。