Spring 流后的 DSL 句柄?
Spring DSL handle after flow?
我正在尝试配置以下流程:尝试在消息到达 Rabbit 队列时获取锁,为某些文件查询远程文件服务器,并为每个找到并释放的文件向另一个队列发送一条新消息发送所有文件后锁定。
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.filter(m -> lockService.acquire())
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.aggregate()
.handle(m -> {
log.info("Releasing lock");
lock.release();
})
.get();
问题是流程在第一个 .handle
方法之后停止(老实说,正如预期的那样),我不知道如何配置它来做我想做的事?我尝试使用 .wireTap
和 .publishSubscribeChannel
但这使得 2 个流程彼此不依赖并且我的锁在文件实际发送之前被释放。
如果有人可以帮助我解释如何使用 DSL 修复它,那就太好了,因为我正在动态创建这些流程...
- 编辑 -
我在通道上设置拦截器的尝试:
final DirectChannel channel = new DirectChannel();
channel.setInterceptors(Collections.singletonList(new ChannelInterceptor() {
@Override
public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
lockService.acquire();
return message;
}
@Override
public void afterSendCompletion(final Message<?> message, final MessageChannel channel, final boolean sent, final Exception ex) {
lock.release();
}
}));
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.channel(channel)
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.get();
但是通过这种方式获取、释放锁,然后才获取消息。我做错了什么?
- 编辑 2 -
从 Gitter 聊天的帮助中弄清楚,以防其他人卡住:
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.channel(MessageChannels.direct().interceptor(new ChannelInterceptor() {
@Override
public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
lockService.acquire();
return message;
}
@Override
public void afterSendCompletion(final Message<?> message, final MessageChannel channel, final boolean sent, final Exception ex) {
lockService.release();
}
}))
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.get());
pub/sub 拆分后,在一个子流上使用 AMQP 处理程序,在另一个子流上使用聚合器应该可以正常工作。
每个都将在同一个线程上连续调用,最后一条消息导致聚合器再次在同一个线程上释放。
话虽如此,您将需要在入站网关上进行一些 errorChannel 处理,以便在发生错误时释放锁定。
编辑
一个不太复杂的解决方案是在转换而不是过滤器之前在通道上自定义 ChannelInterceptor
,以锁定 preSend()
并在 afterSendCompleted()
中释放它(这被称为成功和失败)。
我正在尝试配置以下流程:尝试在消息到达 Rabbit 队列时获取锁,为某些文件查询远程文件服务器,并为每个找到并释放的文件向另一个队列发送一条新消息发送所有文件后锁定。
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.filter(m -> lockService.acquire())
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.aggregate()
.handle(m -> {
log.info("Releasing lock");
lock.release();
})
.get();
问题是流程在第一个 .handle
方法之后停止(老实说,正如预期的那样),我不知道如何配置它来做我想做的事?我尝试使用 .wireTap
和 .publishSubscribeChannel
但这使得 2 个流程彼此不依赖并且我的锁在文件实际发送之前被释放。
如果有人可以帮助我解释如何使用 DSL 修复它,那就太好了,因为我正在动态创建这些流程...
- 编辑 -
我在通道上设置拦截器的尝试:
final DirectChannel channel = new DirectChannel();
channel.setInterceptors(Collections.singletonList(new ChannelInterceptor() {
@Override
public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
lockService.acquire();
return message;
}
@Override
public void afterSendCompletion(final Message<?> message, final MessageChannel channel, final boolean sent, final Exception ex) {
lock.release();
}
}));
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.channel(channel)
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.get();
但是通过这种方式获取、释放锁,然后才获取消息。我做错了什么?
- 编辑 2 -
从 Gitter 聊天的帮助中弄清楚,以防其他人卡住:
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.channel(MessageChannels.direct().interceptor(new ChannelInterceptor() {
@Override
public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
lockService.acquire();
return message;
}
@Override
public void afterSendCompletion(final Message<?> message, final MessageChannel channel, final boolean sent, final Exception ex) {
lockService.release();
}
}))
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.get());
pub/sub 拆分后,在一个子流上使用 AMQP 处理程序,在另一个子流上使用聚合器应该可以正常工作。
每个都将在同一个线程上连续调用,最后一条消息导致聚合器再次在同一个线程上释放。
话虽如此,您将需要在入站网关上进行一些 errorChannel 处理,以便在发生错误时释放锁定。
编辑
一个不太复杂的解决方案是在转换而不是过滤器之前在通道上自定义 ChannelInterceptor
,以锁定 preSend()
并在 afterSendCompleted()
中释放它(这被称为成功和失败)。