多线程 Spring 集成 DSL
Multithreading Spring integration DSL
我想创建一个带有 Spring 集成的简单 IntegrationFlow,但我遇到了困难。
我想创建一个集成流,从 Rabbit Mq 中的多个队列获取消息并将消息发布到不同的 Rest 端点。
我想知道我是否可以将其并行化。
我有两种情况想检查可行性:
- 第一个我想为每个 RabbitMq 队列创建一个线程
收到消息后会监听并执行流程:
Scenario 1
- 第二种情况:在这种情况下,我想为每个队列创建一个动态数量的线程,线程数量根据消息数量增加或减少。
Scenario 2
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate restTemplate = new RestTemplate();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(BOUTIQUE_QUEUE_NAME);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
return IntegrationFlows.from(Amqp.inboundAdapter(container)) /* Get Message from RabbitMQ */
.handle(msg ->
{
String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
System.out.println(msgString);
})
.get();
}
对于第一种情况,只需为每个配置一个入站适配器并将输出通道设置为下游流的公共通道。
对于第二种情况,只需在侦听器容器上设置 concurrentConsumers
和 maxConcurrentConsumers
,它将根据需要扩展 up/down 线程。
我想创建一个带有 Spring 集成的简单 IntegrationFlow,但我遇到了困难。
我想创建一个集成流,从 Rabbit Mq 中的多个队列获取消息并将消息发布到不同的 Rest 端点。
我想知道我是否可以将其并行化。
我有两种情况想检查可行性:
- 第一个我想为每个 RabbitMq 队列创建一个线程 收到消息后会监听并执行流程:
Scenario 1
- 第二种情况:在这种情况下,我想为每个队列创建一个动态数量的线程,线程数量根据消息数量增加或减少。
Scenario 2
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate restTemplate = new RestTemplate();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(BOUTIQUE_QUEUE_NAME);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
return IntegrationFlows.from(Amqp.inboundAdapter(container)) /* Get Message from RabbitMQ */
.handle(msg ->
{
String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
System.out.println(msgString);
})
.get();
}
对于第一种情况,只需为每个配置一个入站适配器并将输出通道设置为下游流的公共通道。
对于第二种情况,只需在侦听器容器上设置 concurrentConsumers
和 maxConcurrentConsumers
,它将根据需要扩展 up/down 线程。