当 spring 批量远程分块工作应用程序启动时,不要 运行 IntegrationFlow
Do not run the IntegrationFlow when spring batch remote chunking worker application starts
我是 spring 集成和批处理的新手,我想用 master 和 worker 开发一个远程分块批处理应用程序。我将 spring 集成和 RabbitMQ 用于消息队列,应用程序运行良好,但 worker itemProccessor 自动启动,但我需要控制何时启动它。
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@EnableIntegration
public class WorkerConfig {
@Autowired
private RemoteChunkingWorkerBuilder<Integer, Integer> remoteChunkingWorkerBuilder;
@Bean
public DirectChannel requestsChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory,"requests"))
.channel(requestsChannel())
.get();
}
@Bean
public DirectChannel repliesChannel() {
System.out.println("repliesChannel 3 ");
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(AmqpTemplate amqpTemplate) {
return IntegrationFlows
.from(repliesChannel())
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("replies"))
.get();
}
@Bean
public ItemProcessor<Integer, Integer> itemProcessor() {
....
}
@Bean
public ItemWriter<Integer> itemWriter() {
...
}
@Bean
public IntegrationFlow workerIntegrationFlow() {
return this.remoteChunkingWorkerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requestsChannel())
.outputChannel(repliesChannel())
.build();
}
}
那么我可以做些什么来手动启动 worker 部分?
给适配器一个 id
并将自动启动设置为 false。
@Bean
public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory,"requests")
.id("inbound")
.autoStartup(false))
.channel(requestsChannel())
.get();
}
然后 @Autowire
适配器并启动它...
@Autowired
AmqpInboundChannelAdapter inbound;
...
inbound.start();
我是 spring 集成和批处理的新手,我想用 master 和 worker 开发一个远程分块批处理应用程序。我将 spring 集成和 RabbitMQ 用于消息队列,应用程序运行良好,但 worker itemProccessor 自动启动,但我需要控制何时启动它。
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@EnableIntegration
public class WorkerConfig {
@Autowired
private RemoteChunkingWorkerBuilder<Integer, Integer> remoteChunkingWorkerBuilder;
@Bean
public DirectChannel requestsChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory,"requests"))
.channel(requestsChannel())
.get();
}
@Bean
public DirectChannel repliesChannel() {
System.out.println("repliesChannel 3 ");
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(AmqpTemplate amqpTemplate) {
return IntegrationFlows
.from(repliesChannel())
.handle(Amqp.outboundAdapter(amqpTemplate).routingKey("replies"))
.get();
}
@Bean
public ItemProcessor<Integer, Integer> itemProcessor() {
....
}
@Bean
public ItemWriter<Integer> itemWriter() {
...
}
@Bean
public IntegrationFlow workerIntegrationFlow() {
return this.remoteChunkingWorkerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requestsChannel())
.outputChannel(repliesChannel())
.build();
}
}
那么我可以做些什么来手动启动 worker 部分?
给适配器一个 id
并将自动启动设置为 false。
@Bean
public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory,"requests")
.id("inbound")
.autoStartup(false))
.channel(requestsChannel())
.get();
}
然后 @Autowire
适配器并启动它...
@Autowired
AmqpInboundChannelAdapter inbound;
...
inbound.start();