是否有开箱即用的 spring 吸收模式来使用动态队列中的消息并处理它们?
is there any out of the box spring ingetration pattern for consuming message from dynamic queue and process them?
我正在尝试使用 spring 集成实现一个场景,它必须处理动态生成的 redis 队列。到目前为止,我在 Internet 上找到的示例适用于预定义的队列。
在我的情况下,应用程序动态生成了 100 多个 redis 队列,我的代码将使用来自这些队列的消息。我已经成功地创建了一个 POC 类型的项目(github link)并且它正在运行。
我想知道是否有更好的方法来实现同样的目的。据我所知,企业集成模式并没有说明从多个动态队列或消息源消费消息,除了自定义或更改现有框架源代码之外,是否有任何开箱即用的解决方案?
相比 xml,我更喜欢使用 Spring 集成 Java 配置和 DSL。
参见Dynamic and Runtime Integration Flows。
To simplify the development experience, Spring Integration introduced IntegrationFlowContext
to register and manage IntegrationFlow
instances at runtime, as the following example shows:
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
This is useful when we have multiple configuration options and have to create several instances of similar flows. To do so, we can iterate our options and create and register IntegrationFlow instances within a loop. Another variant is when our source of data is not Spring-based and we must create it on the fly. ...
编辑
@SpringBootApplication
public class So59117728Application {
public static void main(String[] args) {
SpringApplication.run(So59117728Application.class, args).close();
}
@Bean
public ApplicationRunner runner(RedisConnectionFactory cf, IntegrationFlowContext context,
RedisTemplate<String, String> template) {
return args -> {
IntegrationFlow flow = IntegrationFlows
.from(redisEndpoint("So59117728Application", cf))
.handle(System.out::println)
.get();
context.registration(flow).id("myDynamicFlow").register();
template.boundListOps("So59117728Application").leftPush("foo");
Thread.sleep(10_000);
context.remove("myDynamicFlow");
};
}
private RedisQueueMessageDrivenEndpoint redisEndpoint(String queueName, RedisConnectionFactory cf) {
RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint(queueName, cf);
endpoint.setSerializer(new StringRedisSerializer());
return endpoint;
}
}
我正在尝试使用 spring 集成实现一个场景,它必须处理动态生成的 redis 队列。到目前为止,我在 Internet 上找到的示例适用于预定义的队列。
在我的情况下,应用程序动态生成了 100 多个 redis 队列,我的代码将使用来自这些队列的消息。我已经成功地创建了一个 POC 类型的项目(github link)并且它正在运行。
我想知道是否有更好的方法来实现同样的目的。据我所知,企业集成模式并没有说明从多个动态队列或消息源消费消息,除了自定义或更改现有框架源代码之外,是否有任何开箱即用的解决方案?
相比 xml,我更喜欢使用 Spring 集成 Java 配置和 DSL。
参见Dynamic and Runtime Integration Flows。
To simplify the development experience, Spring Integration introduced
IntegrationFlowContext
to register and manageIntegrationFlow
instances at runtime, as the following example shows:
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
This is useful when we have multiple configuration options and have to create several instances of similar flows. To do so, we can iterate our options and create and register IntegrationFlow instances within a loop. Another variant is when our source of data is not Spring-based and we must create it on the fly. ...
编辑
@SpringBootApplication
public class So59117728Application {
public static void main(String[] args) {
SpringApplication.run(So59117728Application.class, args).close();
}
@Bean
public ApplicationRunner runner(RedisConnectionFactory cf, IntegrationFlowContext context,
RedisTemplate<String, String> template) {
return args -> {
IntegrationFlow flow = IntegrationFlows
.from(redisEndpoint("So59117728Application", cf))
.handle(System.out::println)
.get();
context.registration(flow).id("myDynamicFlow").register();
template.boundListOps("So59117728Application").leftPush("foo");
Thread.sleep(10_000);
context.remove("myDynamicFlow");
};
}
private RedisQueueMessageDrivenEndpoint redisEndpoint(String queueName, RedisConnectionFactory cf) {
RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint(queueName, cf);
endpoint.setSerializer(new StringRedisSerializer());
return endpoint;
}
}