AggregatingMessageHandler 的手动 ACK
Manual ACK for AggregatingMessageHandler
我正在尝试构建这样的集成方案 Rabbit -> AmqpInboundChannelAdapter(AcknowledgeMode.MANUAL) -> DirectChannel -> AggregatingMessageHandler -> DirectChannel -> AmqpOutboundEndpoint
。
我想在内存中聚合消息并在聚合 10 条消息或达到 10 秒超时时释放它。我想这个配置没问题:
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler aggregator(){
AggregatingMessageHandler aggregatingMessageHandler = new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(), new SimpleMessageStore(10));
aggregatingMessageHandler.setCorrelationStrategy(new HeaderAttributeCorrelationStrategy(AmqpHeaders.CORRELATION_ID));
//default false
aggregatingMessageHandler.setExpireGroupsUponCompletion(true); //when grp released (using strategy), remove group so new messages in same grp create new group
aggregatingMessageHandler.setSendPartialResultOnExpiry(true); //when expired because timeout and not because of strategy, still send messages grouped so far
aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(TimeUnit.SECONDS.toMillis(10))); //timeout after X
//timeout is checked only when new message arrives!!
aggregatingMessageHandler.setReleaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(10, TimeUnit.SECONDS.toMillis(10)));
aggregatingMessageHandler.setOutputChannel(amqpOutputChannel());
return aggregatingMessageHandler;
}
现在,我的问题是 - 除了以这种方式创建我自己的 AggregatingMessageHandler 实现之外,是否有任何更简单的方法来手动确认消息:
public class ManualAckAggregatingMessageHandler extends AbstractCorrelatingMessageHandler {
...
private void ackMessage(Channel channel, Long deliveryTag){
try {
Assert.notNull(channel, "Channel must be provided");
Assert.notNull(deliveryTag, "Delivery tag must be provided");
channel.basicAck(deliveryTag, false);
}
catch (IOException e) {
throw new MessagingException("Cannot ACK message", e);
}
}
@Override
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages) {
Object groupId = messageGroup.getGroupId();
MessageGroupStore messageStore = getMessageStore();
messageStore.completeGroup(groupId);
messageGroup.getMessages().forEach(m -> {
Channel channel = (Channel)m.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long)m.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
ackMessage(channel, deliveryTag);
});
if (this.expireGroupsUponCompletion) {
remove(messageGroup);
}
else {
if (messageStore instanceof SimpleMessageStore) {
((SimpleMessageStore) messageStore).clearMessageGroup(groupId);
}
else {
messageStore.removeMessagesFromGroup(groupId, messageGroup.getMessages());
}
}
}
}
更新
在你的帮助下我成功了。最重要的部分:连接工厂必须有 factory.setPublisherConfirms(true)
。 AmqpOutboundEndpoint
必须有这两个设置:outboundEndpoint.setConfirmAckChannel(manualAckChannel())
和 outboundEndpoint.setConfirmCorrelationExpressionString("#root")
,这是 类 其余部分的实现:
public class ManualAckPair {
private Channel channel;
private Long deliveryTag;
public ManualAckPair(Channel channel, Long deliveryTag) {
this.channel = channel;
this.deliveryTag = deliveryTag;
}
public void basicAck(){
try {
this.channel.basicAck(this.deliveryTag, false);
}
catch (IOException e) {
e.printStackTrace();
}
}
}
public abstract class AbstractManualAckAggregatingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
public static final String MANUAL_ACK_PAIRS = PREFIX + "manualAckPairs";
@Override
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
Map<String, Object> aggregatedHeaders = super.aggregateHeaders(group);
List<ManualAckPair> manualAckPairs = new ArrayList<>();
group.getMessages().forEach(m -> {
Channel channel = (Channel)m.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long)m.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
manualAckPairs.add(new ManualAckPair(channel, deliveryTag));
});
aggregatedHeaders.put(MANUAL_ACK_PAIRS, manualAckPairs);
return aggregatedHeaders;
}
}
和
@Service
public class ManualAckServiceActivator {
@ServiceActivator(inputChannel = "manualAckChannel")
public void handle(@Header(MANUAL_ACK_PAIRS) List<ManualAckPair> manualAckPairs) {
manualAckPairs.forEach(manualAckPair -> {
manualAckPair.basicAck();
});
}
}
是的,聚合器不需要这么复杂的逻辑。
您只需在聚合器发布后确认它们 - 在聚合器和 AmqpOutboundEndpoint
之间的服务激活器中。
而且你必须在那里使用 basicAck()
和 multiple
标志到 true
:
@param multiple true to acknowledge all messages up to and
好吧,为此您肯定需要一个自定义 MessageGroupProcessor
来提取整个批次的最高 AmqpHeaders.DELIVERY_TAG
并将其设置为 header 以输出聚合消息。
您可以扩展 DefaultAggregatingMessageGroupProcessor
并覆盖它的 aggregateHeaders()
:
/**
* This default implementation simply returns all headers that have no conflicts among the group. An absent header
* on one or more Messages within the group is not considered a conflict. Subclasses may override this method with
* more advanced conflict-resolution strategies if necessary.
*
* @param group The message group.
* @return The aggregated headers.
*/
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
我正在尝试构建这样的集成方案 Rabbit -> AmqpInboundChannelAdapter(AcknowledgeMode.MANUAL) -> DirectChannel -> AggregatingMessageHandler -> DirectChannel -> AmqpOutboundEndpoint
。
我想在内存中聚合消息并在聚合 10 条消息或达到 10 秒超时时释放它。我想这个配置没问题:
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler aggregator(){
AggregatingMessageHandler aggregatingMessageHandler = new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(), new SimpleMessageStore(10));
aggregatingMessageHandler.setCorrelationStrategy(new HeaderAttributeCorrelationStrategy(AmqpHeaders.CORRELATION_ID));
//default false
aggregatingMessageHandler.setExpireGroupsUponCompletion(true); //when grp released (using strategy), remove group so new messages in same grp create new group
aggregatingMessageHandler.setSendPartialResultOnExpiry(true); //when expired because timeout and not because of strategy, still send messages grouped so far
aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(TimeUnit.SECONDS.toMillis(10))); //timeout after X
//timeout is checked only when new message arrives!!
aggregatingMessageHandler.setReleaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(10, TimeUnit.SECONDS.toMillis(10)));
aggregatingMessageHandler.setOutputChannel(amqpOutputChannel());
return aggregatingMessageHandler;
}
现在,我的问题是 - 除了以这种方式创建我自己的 AggregatingMessageHandler 实现之外,是否有任何更简单的方法来手动确认消息:
public class ManualAckAggregatingMessageHandler extends AbstractCorrelatingMessageHandler {
...
private void ackMessage(Channel channel, Long deliveryTag){
try {
Assert.notNull(channel, "Channel must be provided");
Assert.notNull(deliveryTag, "Delivery tag must be provided");
channel.basicAck(deliveryTag, false);
}
catch (IOException e) {
throw new MessagingException("Cannot ACK message", e);
}
}
@Override
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages) {
Object groupId = messageGroup.getGroupId();
MessageGroupStore messageStore = getMessageStore();
messageStore.completeGroup(groupId);
messageGroup.getMessages().forEach(m -> {
Channel channel = (Channel)m.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long)m.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
ackMessage(channel, deliveryTag);
});
if (this.expireGroupsUponCompletion) {
remove(messageGroup);
}
else {
if (messageStore instanceof SimpleMessageStore) {
((SimpleMessageStore) messageStore).clearMessageGroup(groupId);
}
else {
messageStore.removeMessagesFromGroup(groupId, messageGroup.getMessages());
}
}
}
}
更新
在你的帮助下我成功了。最重要的部分:连接工厂必须有 factory.setPublisherConfirms(true)
。 AmqpOutboundEndpoint
必须有这两个设置:outboundEndpoint.setConfirmAckChannel(manualAckChannel())
和 outboundEndpoint.setConfirmCorrelationExpressionString("#root")
,这是 类 其余部分的实现:
public class ManualAckPair {
private Channel channel;
private Long deliveryTag;
public ManualAckPair(Channel channel, Long deliveryTag) {
this.channel = channel;
this.deliveryTag = deliveryTag;
}
public void basicAck(){
try {
this.channel.basicAck(this.deliveryTag, false);
}
catch (IOException e) {
e.printStackTrace();
}
}
}
public abstract class AbstractManualAckAggregatingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
public static final String MANUAL_ACK_PAIRS = PREFIX + "manualAckPairs";
@Override
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
Map<String, Object> aggregatedHeaders = super.aggregateHeaders(group);
List<ManualAckPair> manualAckPairs = new ArrayList<>();
group.getMessages().forEach(m -> {
Channel channel = (Channel)m.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long)m.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
manualAckPairs.add(new ManualAckPair(channel, deliveryTag));
});
aggregatedHeaders.put(MANUAL_ACK_PAIRS, manualAckPairs);
return aggregatedHeaders;
}
}
和
@Service
public class ManualAckServiceActivator {
@ServiceActivator(inputChannel = "manualAckChannel")
public void handle(@Header(MANUAL_ACK_PAIRS) List<ManualAckPair> manualAckPairs) {
manualAckPairs.forEach(manualAckPair -> {
manualAckPair.basicAck();
});
}
}
是的,聚合器不需要这么复杂的逻辑。
您只需在聚合器发布后确认它们 - 在聚合器和 AmqpOutboundEndpoint
之间的服务激活器中。
而且你必须在那里使用 basicAck()
和 multiple
标志到 true
:
@param multiple true to acknowledge all messages up to and
好吧,为此您肯定需要一个自定义 MessageGroupProcessor
来提取整个批次的最高 AmqpHeaders.DELIVERY_TAG
并将其设置为 header 以输出聚合消息。
您可以扩展 DefaultAggregatingMessageGroupProcessor
并覆盖它的 aggregateHeaders()
:
/**
* This default implementation simply returns all headers that have no conflicts among the group. An absent header
* on one or more Messages within the group is not considered a conflict. Subclasses may override this method with
* more advanced conflict-resolution strategies if necessary.
*
* @param group The message group.
* @return The aggregated headers.
*/
protected Map<String, Object> aggregateHeaders(MessageGroup group) {