如何使 RedisQueueMessageDrivenEndpoint 与 IntegrationFlow 一起工作?
How to make RedisQueueMessageDrivenEndpoint work with IntegrationFlow?
我想知道如何使 RedisQueueMessageDrivenEndpoint 与 IntegrationFlow 一起工作,以便我可以接收从以下代码中指定的列表中出队的消息? "redisRpopChannel()" 似乎根本没有收到任何消息。请帮忙。
@Bean
public RedisOutboundGateway redisOutboundGateway(RedisConnectionFactory connectionFactory) {
RedisOutboundGateway gateway = new RedisOutboundGateway(connectionFactory);
Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class);
gateway.setArgumentsSerializer(serializer);
return gateway;
}
@Bean
public IntegrationFlow redisLpushRequestFlow(RedisOutboundGateway gateway, BeanFactory beanFactory) {
ExpressionArgumentsStrategy strategy = new ExpressionArgumentsStrategy(new String[]{"headers.queue", "#cmd == 'LPUSH' ? payload : null"}, true);
strategy.setBeanFactory(beanFactory);
gateway.setArgumentsStrategy(strategy);
return flow -> flow.publishSubscribeChannel(s->s.subscribe(f -> f
.enrich(e -> e.<ObjectNode>requestPayload(m -> {
String partition = m.getHeaders().get("correlationId").toString();
ObjectNode objectNode = m.getPayload();
objectNode.put(PayLoadKeys.PARTITION, partition);
objectNode.put(PayLoadKeys.SEQ, m.getHeaders().get("sequenceNumber").toString());
return objectNode;
}).shouldClonePayload(false)
.header(RedisHeaders.COMMAND, "LPUSH").header("queue", files))
.handle(gateway).channel("redisLpushResponseFlow.input")));
}
@Bean
public IntegrationFlow redisLpushResponseFlow() {
return flow -> flow.resequence().aggregate().<List<Long>>handle((p,h)-> {
ObjectNode objectNode = mapper.createObjectNode();
objectNode.put(PayLoadKeys.PARTITION, h.get("correlationId").toString());
if(h.get("mode").equals("debug")) {
objectNode.set(PayLoadKeys.DEBUG,
mapper.valueToTree(p.stream().collect(Collectors.toList())));
}
return objectNode;
}).channel(httpInboundReplyChannel());
@Bean
public MessageChannel redisRpopChannel() {
return MessageChannels.queue().get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedRate(500).get();
}
@Bean
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory, BeanFactory beanFactory) {
RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint(files, connectionFactory);
Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class);
endpoint.setSerializer(serializer);
endpoint.setBeanFactory(beanFactory);
endpoint.setAutoStartup(true);
endpoint.setOutputChannel(redisRpopChannel());
endpoint.afterPropertiesSet();
endpoint.start();
return endpoint;
}
@Bean
public IntegrationFlow redisQueuePollingFlow() {
class ThrottledTaskExecutor implements TaskExecutor {
final Semaphore semaphore;
final TaskExecutor taskExecutor;
ThrottledTaskExecutor(ThreadPoolTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
this.semaphore = new Semaphore(taskExecutor.getCorePoolSize());
}
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("Task is null in ThrottledTaskExecutor.");
}
doSubmit(task);
}
void doSubmit(final Runnable task) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TaskRejectedException("Task could not be submitted because of a thread interruption.");
}
try {
taskExecutor.execute(new FutureTask<Void>(task, null) {
@Override
protected void done() {
semaphore.release();
}
});
} catch (TaskRejectedException e) {
semaphore.release();
throw e;
}
}
}
return IntegrationFlows
.from(redisRpopChannel())
.transform(Transformers.fromJson(ObjectNode.class))
.handle(message -> {
ObjectNode p = (ObjectNode) message.getPayload();
ThreadPoolTaskExecutor taskExecutor = taskExecutor();
ThrottledTaskExecutor throttledTaskExecutor = new ThrottledTaskExecutor(taskExecutor);
if(p.hasNonNull(PayLoadKeys.ID_ARRAY)) {
String array = p.remove(PayLoadKeys.ID_ARRAY).asText();
if (p.hasNonNull(array)) {
p.remove(array).forEach(id -> {
ObjectNode param = p.deepCopy();
final Long finalId = id.asLong();
param.put("id", finalId);
throttledTaskExecutor.execute(new JobLaunchTask(param));
});
}
} else {
throttledTaskExecutor.execute(new JobLaunchTask(p));
}
taskExecutor.shutdown();
}).get();
}
在 DSL 中使用消息驱动端点(定义为 @Bean
s)时,当前 a problem。
问题是初始化期间需要输出通道。但是,当端点稍后连接到流中时,该通道将被替换。
您不应在 @Bean
定义中调用 afterPropertiesSet()
和 start()
等方法。
这对我有用...
@Bean
public RedisConnectionFactory connectionFactory() {
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
jedisConnectionFactory.setPort(6379);
return jedisConnectionFactory;
}
@Bean
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory) {
RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint("foo", connectionFactory);
Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class);
endpoint.setSerializer(serializer);
endpoint.setAutoStartup(true);
endpoint.setOutputChannel(new DirectChannel()); // will be replaced
return endpoint;
}
@Bean
public IntegrationFlow flow(RedisConnectionFactory connectionFactory) {
return IntegrationFlows.from(redisQueueMessageDrivenEndpoint(connectionFactory))
.handle(System.out::println)
.get();
}
我在redis-cli中用> lpush foo '{"foo":"bar"}'
测试过
编辑
不过,你的技术也很管用(对我来说)...
@Bean
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory) {
RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint("foo", connectionFactory);
Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class);
endpoint.setSerializer(serializer);
endpoint.setAutoStartup(true);
endpoint.setOutputChannel(rpopChannel());
return endpoint;
}
@Bean
public IntegrationFlow flow(RedisConnectionFactory connectionFactory) {
return IntegrationFlows.from(rpopChannel())
.handle(System.out::println)
.get();
}
@Bean
public MessageChannel rpopChannel() {
return new DirectChannel();
}
再次,我从端点中删除了所有容器管理的属性; Spring 设置所有这些。
我想知道如何使 RedisQueueMessageDrivenEndpoint 与 IntegrationFlow 一起工作,以便我可以接收从以下代码中指定的列表中出队的消息? "redisRpopChannel()" 似乎根本没有收到任何消息。请帮忙。
@Bean
public RedisOutboundGateway redisOutboundGateway(RedisConnectionFactory connectionFactory) {
RedisOutboundGateway gateway = new RedisOutboundGateway(connectionFactory);
Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class);
gateway.setArgumentsSerializer(serializer);
return gateway;
}
@Bean
public IntegrationFlow redisLpushRequestFlow(RedisOutboundGateway gateway, BeanFactory beanFactory) {
ExpressionArgumentsStrategy strategy = new ExpressionArgumentsStrategy(new String[]{"headers.queue", "#cmd == 'LPUSH' ? payload : null"}, true);
strategy.setBeanFactory(beanFactory);
gateway.setArgumentsStrategy(strategy);
return flow -> flow.publishSubscribeChannel(s->s.subscribe(f -> f
.enrich(e -> e.<ObjectNode>requestPayload(m -> {
String partition = m.getHeaders().get("correlationId").toString();
ObjectNode objectNode = m.getPayload();
objectNode.put(PayLoadKeys.PARTITION, partition);
objectNode.put(PayLoadKeys.SEQ, m.getHeaders().get("sequenceNumber").toString());
return objectNode;
}).shouldClonePayload(false)
.header(RedisHeaders.COMMAND, "LPUSH").header("queue", files))
.handle(gateway).channel("redisLpushResponseFlow.input")));
}
@Bean
public IntegrationFlow redisLpushResponseFlow() {
return flow -> flow.resequence().aggregate().<List<Long>>handle((p,h)-> {
ObjectNode objectNode = mapper.createObjectNode();
objectNode.put(PayLoadKeys.PARTITION, h.get("correlationId").toString());
if(h.get("mode").equals("debug")) {
objectNode.set(PayLoadKeys.DEBUG,
mapper.valueToTree(p.stream().collect(Collectors.toList())));
}
return objectNode;
}).channel(httpInboundReplyChannel());
@Bean
public MessageChannel redisRpopChannel() {
return MessageChannels.queue().get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedRate(500).get();
}
@Bean
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory, BeanFactory beanFactory) {
RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint(files, connectionFactory);
Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class);
endpoint.setSerializer(serializer);
endpoint.setBeanFactory(beanFactory);
endpoint.setAutoStartup(true);
endpoint.setOutputChannel(redisRpopChannel());
endpoint.afterPropertiesSet();
endpoint.start();
return endpoint;
}
@Bean
public IntegrationFlow redisQueuePollingFlow() {
class ThrottledTaskExecutor implements TaskExecutor {
final Semaphore semaphore;
final TaskExecutor taskExecutor;
ThrottledTaskExecutor(ThreadPoolTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
this.semaphore = new Semaphore(taskExecutor.getCorePoolSize());
}
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("Task is null in ThrottledTaskExecutor.");
}
doSubmit(task);
}
void doSubmit(final Runnable task) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TaskRejectedException("Task could not be submitted because of a thread interruption.");
}
try {
taskExecutor.execute(new FutureTask<Void>(task, null) {
@Override
protected void done() {
semaphore.release();
}
});
} catch (TaskRejectedException e) {
semaphore.release();
throw e;
}
}
}
return IntegrationFlows
.from(redisRpopChannel())
.transform(Transformers.fromJson(ObjectNode.class))
.handle(message -> {
ObjectNode p = (ObjectNode) message.getPayload();
ThreadPoolTaskExecutor taskExecutor = taskExecutor();
ThrottledTaskExecutor throttledTaskExecutor = new ThrottledTaskExecutor(taskExecutor);
if(p.hasNonNull(PayLoadKeys.ID_ARRAY)) {
String array = p.remove(PayLoadKeys.ID_ARRAY).asText();
if (p.hasNonNull(array)) {
p.remove(array).forEach(id -> {
ObjectNode param = p.deepCopy();
final Long finalId = id.asLong();
param.put("id", finalId);
throttledTaskExecutor.execute(new JobLaunchTask(param));
});
}
} else {
throttledTaskExecutor.execute(new JobLaunchTask(p));
}
taskExecutor.shutdown();
}).get();
}
在 DSL 中使用消息驱动端点(定义为 @Bean
s)时,当前 a problem。
问题是初始化期间需要输出通道。但是,当端点稍后连接到流中时,该通道将被替换。
您不应在 @Bean
定义中调用 afterPropertiesSet()
和 start()
等方法。
这对我有用...
@Bean
public RedisConnectionFactory connectionFactory() {
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
jedisConnectionFactory.setPort(6379);
return jedisConnectionFactory;
}
@Bean
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory) {
RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint("foo", connectionFactory);
Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class);
endpoint.setSerializer(serializer);
endpoint.setAutoStartup(true);
endpoint.setOutputChannel(new DirectChannel()); // will be replaced
return endpoint;
}
@Bean
public IntegrationFlow flow(RedisConnectionFactory connectionFactory) {
return IntegrationFlows.from(redisQueueMessageDrivenEndpoint(connectionFactory))
.handle(System.out::println)
.get();
}
我在redis-cli中用> lpush foo '{"foo":"bar"}'
测试过
编辑
不过,你的技术也很管用(对我来说)...
@Bean
public RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory connectionFactory) {
RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint("foo", connectionFactory);
Jackson2JsonRedisSerializer<? extends JsonNode> serializer = new Jackson2JsonRedisSerializer<>(JsonNode.class);
endpoint.setSerializer(serializer);
endpoint.setAutoStartup(true);
endpoint.setOutputChannel(rpopChannel());
return endpoint;
}
@Bean
public IntegrationFlow flow(RedisConnectionFactory connectionFactory) {
return IntegrationFlows.from(rpopChannel())
.handle(System.out::println)
.get();
}
@Bean
public MessageChannel rpopChannel() {
return new DirectChannel();
}
再次,我从端点中删除了所有容器管理的属性; Spring 设置所有这些。