当 spring 批量远程分块工作应用程序启动时,不要 运行 IntegrationFlow

Do not run the IntegrationFlow when spring batch remote chunking worker application starts

我是 spring 集成和批处理的新手,我想用 master 和 worker 开发一个远程分块批处理应用程序。我将 spring 集成和 RabbitMQ 用于消息队列,应用程序运行良好,但 worker itemProccessor 自动启动,但我需要控制何时启动它。

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@EnableIntegration
public class WorkerConfig {


    @Autowired
    private RemoteChunkingWorkerBuilder<Integer, Integer> remoteChunkingWorkerBuilder;

    @Bean
    public DirectChannel requestsChannel() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(Amqp.inboundAdapter(connectionFactory,"requests"))
                .channel(requestsChannel())
                .get();
    }

    @Bean
    public DirectChannel repliesChannel() {
        System.out.println("repliesChannel 3 ");
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(AmqpTemplate amqpTemplate) {
        return IntegrationFlows
                .from(repliesChannel())
                .handle(Amqp.outboundAdapter(amqpTemplate).routingKey("replies"))
                .get();
    }

    @Bean
    public ItemProcessor<Integer, Integer> itemProcessor() {
       ....
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
       ...
    }

    @Bean
    public IntegrationFlow workerIntegrationFlow() {
        return this.remoteChunkingWorkerBuilder
                .itemProcessor(itemProcessor())
                .itemWriter(itemWriter())
                .inputChannel(requestsChannel())
                .outputChannel(repliesChannel())
                .build();
    }


}

那么我可以做些什么来手动启动 worker 部分?

给适配器一个 id 并将自动启动设置为 false。

@Bean
public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(Amqp.inboundAdapter(connectionFactory,"requests")
                .id("inbound")
                .autoStartup(false))
            .channel(requestsChannel())
            .get();
}

然后 @Autowire 适配器并启动它...

@Autowired
AmqpInboundChannelAdapter inbound;

...
    inbound.start();