Spring 集成轮询器

Spring integration poller

美好的一天!抱歉我的英语不好)

我尝试在 Spring 集成中检查 Poller Channel 的工作,但出现错误: 当我使用 DirectChannel(没有轮询器)时一切都很好,但是当我将 pollerChannel 与队列一起使用时,我会出错。

@SpringBootApplication
public class IntegrationSysoutApplication {

    @Bean
    DirectChannel outputChannel() {
        return new DirectChannel();
    }

    @Bean
    PollableChannel pollerChannel() {
        return new QueueChannel();
    }

/*    @MessagingGateway
    public interface MyGateway {

        @Gateway(requestChannel = "animalFlow.input")
        void process(Animal animal);
    }*/


    @Bean
    public IntegrationFlow animalFlow() {
        return IntegrationFlows.from(pollerChannel(), x -> x.poller(Pollers.fixedRate(100)))
                .filter((GenericSelector<Animal>) animal -> !animal.getAnimalType().equals("cat"))
                .handle("bService", "process")
                .handle("cService", "process")
                .handle("aService", "process")
                .channel("outputChannel")
                .get();
    }


    public static void main(String[] args) {
        ConfigurableApplicationContext ctx = SpringApplication
                .run(IntegrationSysoutApplication.class, args);
        DirectChannel outputChannel = ctx.getBean("outputChannel", DirectChannel.class);
        outputChannel
                .subscribe(message -> System.out.println("SUPER_MESSAGE: " + message));

        //Отправляем сообщение в gateway
       /* ctx.getBean(MyGateway.class).process(new Animal("lion"));
        ctx.getBean(MyGateway.class).process(new Animal("cat"));
        ctx.getBean(MyGateway.class).process(new Animal("penguin"));*/
       
        //Отправляем сообщение сразу в очередь потока, без gateway
        MessageChannel inputChannel = ctx.getBean("pollerChannel", PollableChannel.class);
        inputChannel.send(MessageBuilder.withPayload(new Animal("penguin")).build());
        inputChannel.send(MessageBuilder.withPayload(new Animal("cat")).build());
        inputChannel.send(MessageBuilder.withPayload(new Animal("dog")).build());

        ctx.close();
    }

}

此时我出错,无法为队列添加轮询器:

IntegrationFlows.from(pollerChannel(), x -> x.poller(Pollers.fixedRate(100)))

Error:(47, 32) java: no suitable method found for from(java.lang.String,(x)->x.pol[...]100)))
    method org.springframework.integration.dsl.IntegrationFlows.from(java.lang.String,boolean) is not applicable...      

在这种情况下如何使用 Poller 作为 pollingChannel?

轮询器配置在 PollableChannel 的使用者上进行 - 在这种情况下,

.filter(..., e -> e.poller...)

消费者“轮询”频道以获取新消息。