RabbitMq:历久弥新,不如我所想

RabbitMq: works through times, not as I expect

所以,一步一步,我想做什么:

我收到数据如果发送到另一个系统时出错,那么我想发送数据到rabbitMQ:

    @Override
    public void updateAnketaIfThrowThenSendMessageInRabbit(ProfileId profileId, ChangeClientAnketaRequest anketa, String profileVersion) {
        try {
            anketaService.updateAnketa(profileId, anketa, profileVersion);
        } catch (ClubProNotAvailableException e) {
            rabbitTemplate.convertAndSend(config.getExchange(), config.getRoutingKey(), clubProNotAvailableRabbit);
            Anketa a = conversionService.convert(conversionService.convert(anketa, UgAnketa.class), Anketa.class);
            profileService.updateProfileAnketa(profileId, a, null);
        }
    }
}

接下来,我要接受这些数据和队列,并尝试在一定的时间间隔再次发送它们。

为此我:

  1. 我接受消息

  2. 我正在尝试重发:

    a) 如果一切顺利,我将其从队列中删除

    b) 如果发生错误,我调用容器的停止方法。一段时间后,我使用调度程序调用容器的启动方法

   @Override
    public void onMessage(Message message, Channel channel) throws IOException {
        ClubProNotAvailableRabbit data = null;
        try {
            data = OBJECT_MAPPER.readValue(message.getBody(), ClubProNotAvailableRabbit.class);
            MDC.put(data.getRequestContextRabbit().getRequestId(), UUID.randomUUID().toString());
            requestContextService.put(createRequestContext(data.getRequestContextRabbit(), data.getRequestContextRabbit().getFront()));
            methodCall(data);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (ClubProNotAvailableException e) {
            listenerContainer.stop();
            throw new ClubProNotAvailableException();
        }
    }

     public void startContainer() {
        listenerContainer.start();
    }

我遇到过这样的问题:

  1. 消息并不是每次都传递到队列中。有时我不得不多次调用convert And Send方法。

  2. 当我从队列中收到消息并发生错误时,我关闭容器,然后当它打开时,队列为空,当我关闭时,我看到这条消息:

2020-07-20 21:36:59.878 [INFO ] o.s.a.r.l.SimpleMessageListenerContainer - Workers not finished.
2020-07-20 21:36:59.878 [WARN ] o.s.a.r.l.SimpleMessageListenerContainer - Closing channel for unresponsive consumer: Consumer@77416991: tags=[[amq.ctag-E62UisbYdAAOQIM2bWr08w]], channel=Cached Rabbit Channel: AMQChannel(amqp://usergate_tst@10.64.177.12:5672/,35), conn: Proxy@6c60c170 Shared Rabbit Connection: SimpleConnection@5fb65b3a [delegate=amqp://usergate_tst@10.64.177.12:5672/, localPort= 59801], acknowledgeMode=MANUAL local queue size=0

我该如何解决这种情况?

继续提问。

我更正了这样的代码:

   @Override
    public void onMessage(Message message, Channel channel) throws IOException, InterruptedException {
        ClubProNotAvailableRabbit data = null;
        try {
            data = OBJECT_MAPPER.readValue(message.getBody(), ClubProNotAvailableRabbit.class);
            MDC.put(data.getRequestContextRabbit().getRequestId(), UUID.randomUUID().toString());
            requestContextService.put(createRequestContext(data.getRequestContextRabbit(), data.getRequestContextRabbit().getFront()));
            methodCall(data);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (ClubProNotAvailableException e) {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
            Thread.sleep(20000);
        } 
    }

Thread.sleep来这里做实验

我希望当我在 rabbitmq 管理控制台中从队列中抓取一条消息时,我会看到它进入 Unacked 状态,这就是它发生的方式。

然后,当发生错误时,我调用了 basicReject 方法,我希望状态在 basicReject 调用行之后立即变为就绪,但是一旦该方法完全完成它就变为就绪。

未确认状态:

rabbitMq admin console

虽然baseReject方法已经起作用了。

为什么会这样?它应该如何工作以及什么机制?为什么在调用 baseReject 方法后消息没有立即就绪(控制台 rabbit 中的状态)?

Closing channel for unresponsive consumer:

这意味着侦听器“卡在”您的代码中 - 您无法从侦听器本身调用 stop() - container.stop() 等待侦听器退出。您应该改用 stop(() -> log.info("stopped container"))

您需要 basicReject 在 catch 案例中 - 容器不会为您处理 MANUAL acks。

如果您 ack/nack 自己发送消息,则必须使用手动确认。

通常让容器负责确认您的消息会更好。