处理 Spring-AMQP 中的 403

Acting on 403s in Spring-AMQP

我需要在不同运行时使用可变数量的消费者线程来保证消费者独占性,这些线程从固定数量的队列中消费(其中队列的数量远大于消费者的数量)。

我的一般想法是让每个消费者线程尝试建立一个独占连接以清除队列,并且,如果它在给定的时间段内没有从该队列接收到消息,则将其重定向到另一个队列。

即使临时清除队列,它也有可能在将来再次接收消息,因此不能简单地忘记该队列——相反,消费者应该稍后 return 接收它。为了实现这种轮换,我想我会使用队列队列。当消费者失败时,危险会丢失对队列队列中队列的引用;我认为这似乎可以通过致谢解决,如下所示。

本质上,每个消费者线程都等待从队列中获取一条消息 (A),其中引用了队列 (1);消息 (A) 最初仍未被确认。消费者愉快地尝试清除队列 (1),并且一旦队列 (1) 在给定的时间内保持为空,消费者就会从队列中请求一个新的队列名称。在收到第二条消息 (B) 和对新队列 (2) 的引用后,对队列 (1) 的引用作为新消息 (C) 放回队列的末尾,最后是消息(A) 被承认。

事实上,队列中的队列的至少交付和可能仅交付一次保证几乎让我对这里的普通队列 (1, 2) 独占,但为了确保我绝对不要丢失对队列的引用,我需要将队列 (1) 重新发布为消息 (C) 我确认消息 (A) 之前。这意味着如果服务器在将队列 (1) 重新发布为消息 (C) 之后但在确认 (A) 之前失败,则队列中的队列中可能存在对队列 (1) 的两个引用,并且不再保证排他性。

因此,我需要使用 AMQP 独有的消费者标志,这很棒,但就目前而言,如果我收到“403 ACCESS REFUSED”,我也不想重新发布对队列的引用它,这样重复的引用就不会扩散。

但是,我正在使用 Spring 优秀的 AMQP 库,但我不知道如何使用错误处理程序进行连接。容器上公开的 setErrorHandler 方法似乎不适用于“403 ACCESS REFUSED”错误。

有没有一种方法可以使用我当前使用的框架对 403 采取行动?或者,还有其他方法可以实现我需要的保证吗?我的代码如下。

"monitoring service":

import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpAuthenticationException;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class ListenerMonitoringService {

    private static final Logger log = LoggerFactory.getLogger(ListenerMonitoringService.class);

    private static final Period EXPIRATION_PERIOD = Period.millis(5000);

    private static final long MONTIORING_POLL_INTERVAL = 5000;
    private static final long MONITORING_INITIAL_DELAY = 5000;

    private final Supplier<AbstractMessageListenerContainer> messageListenerContainerSupplier;

    private final QueueCoordinator queueCoordinator;
    private final ScheduledExecutorService executorService;

    private final Collection<Record> records;

    public ListenerMonitoringService(Supplier<AbstractMessageListenerContainer> messageListenerContainerSupplier,
                                     QueueCoordinator queueCoordinator, ScheduledExecutorService executorService) {
        this.messageListenerContainerSupplier = messageListenerContainerSupplier;
        this.queueCoordinator = queueCoordinator;
        this.executorService = executorService;

        records = new ArrayList<>();
    }

    public void registerAndStart(MessageListener messageListener) {
        Record record = new Record(messageListenerContainerSupplier.get());

        // wrap with listener that updates record
        record.container.setMessageListener((MessageListener) (m -> {
            log.trace("{} consumed a message from {}", record.container, Arrays.toString(record.container.getQueueNames()));
            record.freshen(DateTime.now(DateTimeZone.UTC));
            messageListener.onMessage(m);
        }));

        record.container.setErrorHandler(e -> {
            log.error("{} received an {}", record.container, e);
            // this doesn't get called for 403s
        });

        // initial start up
        executorService.execute(() -> {
            String queueName = queueCoordinator.getQueueName();

            log.debug("Received queue name {}", queueName);
            record.container.setQueueNames(queueName);

            log.debug("Starting container {}", record.container);
            record.container.start();

            // background monitoring thread
            executorService.scheduleAtFixedRate(() -> {
                log.debug("Checking container {}", record.container);
                if (record.isStale(DateTime.now(DateTimeZone.UTC))) {
                    String newQueue = queueCoordinator.getQueueName();
                    String oldQueue = record.container.getQueueNames()[0];
                    log.debug("Switching queues for {} from {} to {}", record.container, oldQueue, newQueue);
                    record.container.setQueueNames(newQueue);

                    queueCoordinator.markSuccessful(queueName);
                }
            }, MONITORING_INITIAL_DELAY, MONTIORING_POLL_INTERVAL, TimeUnit.MILLISECONDS);
        });

        records.add(record);
    }

    private static class Record {
        private static final DateTime DATE_TIME_MIN = new DateTime(0);

        private final AbstractMessageListenerContainer container;
        private Optional<DateTime> lastListened;

        private Record(AbstractMessageListenerContainer container) {
            this.container = container;
            lastListened = Optional.empty();
        }

        public synchronized boolean isStale(DateTime now) {
            log.trace("Comparing now {} to {} for {}", now, lastListened, container);
            return lastListened.orElse(DATE_TIME_MIN).plus(EXPIRATION_PERIOD).isBefore(now);
        }

        public synchronized void freshen(DateTime now) {
            log.trace("Updating last listened to {} for {}", now, container);
            lastListened = Optional.of(now);
        }
    }
}

"queue-of-queues" 处理程序:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

private class MetaQueueCoordinator implements QueueCoordinator {

    private static final Logger log = LoggerFactory.getLogger(MetaQueueCoordinator.class);

    private final Channel channel;
    private final Map<String, Envelope> envelopeMap;
    private final RabbitTemplate rabbitTemplate;

    public MetaQueueCoordinator(ConnectionFactory connectionFactory) {
        Connection connection = connectionFactory.createConnection();
        channel = connection.createChannel(false);

        envelopeMap = new ConcurrentHashMap<>();
        rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange("");
        rabbitTemplate.setRoutingKey("queue_of_queues");
    }

    @Override
    public String getQueueName() {
        GetResponse response;
        try {
            response = channel.basicGet("queue_of_queues", false);
        } catch (IOException e) {
            log.error("Unable to get from channel");
            throw new RuntimeException(e);
        }

        String queueName = new String(response.getBody());
        envelopeMap.put(queueName, response.getEnvelope());

        return queueName;
    }

    @Override
    public void markSuccessful(String queueName) {
        Envelope envelope = envelopeMap.remove(queueName);
        if (envelope == null) {
            return;
        }

        log.debug("Putting {} at the end of the line...", queueName);
        rabbitTemplate.convertAndSend(queueName);

        try {
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (IOException e) {
            log.error("Unable to acknowledge {}", queueName);
        }
    }

    @Override
    public void markUnsuccessful(String queueName) {
        Envelope envelope = envelopeMap.remove(queueName);
        if (envelope == null) {
            return;
        }

        try {
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (IOException e) {
            log.error("Unable to acknowledge {}", queueName);
        }
    }
}

ErrorHandler 用于处理消息传递期间的错误,而不是设置侦听器本身。

即将发布的 1.5 版本 publishes application events 出现此类异常时。

将于今年夏天晚些时候发布;此功能目前仅在 1.5.0.BUILD-SNAPSHOT 中可用;在接下来的几周内应该会发布候选版本。

project page 展示了如何从快照存储库中获取快照。