RabbitMq:历久弥新,不如我所想
RabbitMq: works through times, not as I expect
所以,一步一步,我想做什么:
我收到数据如果发送到另一个系统时出错,那么我想发送数据到rabbitMQ:
@Override
public void updateAnketaIfThrowThenSendMessageInRabbit(ProfileId profileId, ChangeClientAnketaRequest anketa, String profileVersion) {
try {
anketaService.updateAnketa(profileId, anketa, profileVersion);
} catch (ClubProNotAvailableException e) {
rabbitTemplate.convertAndSend(config.getExchange(), config.getRoutingKey(), clubProNotAvailableRabbit);
Anketa a = conversionService.convert(conversionService.convert(anketa, UgAnketa.class), Anketa.class);
profileService.updateProfileAnketa(profileId, a, null);
}
}
}
接下来,我要接受这些数据和队列,并尝试在一定的时间间隔再次发送它们。
为此我:
我接受消息
我正在尝试重发:
a) 如果一切顺利,我将其从队列中删除
b) 如果发生错误,我调用容器的停止方法。一段时间后,我使用调度程序调用容器的启动方法
@Override
public void onMessage(Message message, Channel channel) throws IOException {
ClubProNotAvailableRabbit data = null;
try {
data = OBJECT_MAPPER.readValue(message.getBody(), ClubProNotAvailableRabbit.class);
MDC.put(data.getRequestContextRabbit().getRequestId(), UUID.randomUUID().toString());
requestContextService.put(createRequestContext(data.getRequestContextRabbit(), data.getRequestContextRabbit().getFront()));
methodCall(data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (ClubProNotAvailableException e) {
listenerContainer.stop();
throw new ClubProNotAvailableException();
}
}
public void startContainer() {
listenerContainer.start();
}
我遇到过这样的问题:
消息并不是每次都传递到队列中。有时我不得不多次调用convert And Send方法。
当我从队列中收到消息并发生错误时,我关闭容器,然后当它打开时,队列为空,当我关闭时,我看到这条消息:
2020-07-20 21:36:59.878 [INFO ] o.s.a.r.l.SimpleMessageListenerContainer - Workers not finished.
2020-07-20 21:36:59.878 [WARN ] o.s.a.r.l.SimpleMessageListenerContainer - Closing channel for unresponsive consumer: Consumer@77416991: tags=[[amq.ctag-E62UisbYdAAOQIM2bWr08w]], channel=Cached Rabbit Channel: AMQChannel(amqp://usergate_tst@10.64.177.12:5672/,35), conn: Proxy@6c60c170 Shared Rabbit Connection: SimpleConnection@5fb65b3a [delegate=amqp://usergate_tst@10.64.177.12:5672/, localPort= 59801], acknowledgeMode=MANUAL local queue size=0
我该如何解决这种情况?
继续提问。
我更正了这样的代码:
@Override
public void onMessage(Message message, Channel channel) throws IOException, InterruptedException {
ClubProNotAvailableRabbit data = null;
try {
data = OBJECT_MAPPER.readValue(message.getBody(), ClubProNotAvailableRabbit.class);
MDC.put(data.getRequestContextRabbit().getRequestId(), UUID.randomUUID().toString());
requestContextService.put(createRequestContext(data.getRequestContextRabbit(), data.getRequestContextRabbit().getFront()));
methodCall(data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (ClubProNotAvailableException e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
Thread.sleep(20000);
}
}
Thread.sleep来这里做实验
我希望当我在 rabbitmq 管理控制台中从队列中抓取一条消息时,我会看到它进入 Unacked 状态,这就是它发生的方式。
然后,当发生错误时,我调用了 basicReject 方法,我希望状态在 basicReject 调用行之后立即变为就绪,但是一旦该方法完全完成它就变为就绪。
未确认状态:
虽然baseReject方法已经起作用了。
为什么会这样?它应该如何工作以及什么机制?为什么在调用 baseReject 方法后消息没有立即就绪(控制台 rabbit 中的状态)?
Closing channel for unresponsive consumer:
这意味着侦听器“卡在”您的代码中 - 您无法从侦听器本身调用 stop()
- container.stop()
等待侦听器退出。您应该改用 stop(() -> log.info("stopped container"))
。
您需要 basicReject
在 catch 案例中 - 容器不会为您处理 MANUAL acks。
如果您 ack/nack 自己发送消息,则必须使用手动确认。
通常让容器负责确认您的消息会更好。
所以,一步一步,我想做什么:
我收到数据如果发送到另一个系统时出错,那么我想发送数据到rabbitMQ:
@Override
public void updateAnketaIfThrowThenSendMessageInRabbit(ProfileId profileId, ChangeClientAnketaRequest anketa, String profileVersion) {
try {
anketaService.updateAnketa(profileId, anketa, profileVersion);
} catch (ClubProNotAvailableException e) {
rabbitTemplate.convertAndSend(config.getExchange(), config.getRoutingKey(), clubProNotAvailableRabbit);
Anketa a = conversionService.convert(conversionService.convert(anketa, UgAnketa.class), Anketa.class);
profileService.updateProfileAnketa(profileId, a, null);
}
}
}
接下来,我要接受这些数据和队列,并尝试在一定的时间间隔再次发送它们。
为此我:
我接受消息
我正在尝试重发:
a) 如果一切顺利,我将其从队列中删除
b) 如果发生错误,我调用容器的停止方法。一段时间后,我使用调度程序调用容器的启动方法
@Override
public void onMessage(Message message, Channel channel) throws IOException {
ClubProNotAvailableRabbit data = null;
try {
data = OBJECT_MAPPER.readValue(message.getBody(), ClubProNotAvailableRabbit.class);
MDC.put(data.getRequestContextRabbit().getRequestId(), UUID.randomUUID().toString());
requestContextService.put(createRequestContext(data.getRequestContextRabbit(), data.getRequestContextRabbit().getFront()));
methodCall(data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (ClubProNotAvailableException e) {
listenerContainer.stop();
throw new ClubProNotAvailableException();
}
}
public void startContainer() {
listenerContainer.start();
}
我遇到过这样的问题:
消息并不是每次都传递到队列中。有时我不得不多次调用convert And Send方法。
当我从队列中收到消息并发生错误时,我关闭容器,然后当它打开时,队列为空,当我关闭时,我看到这条消息:
2020-07-20 21:36:59.878 [INFO ] o.s.a.r.l.SimpleMessageListenerContainer - Workers not finished.
2020-07-20 21:36:59.878 [WARN ] o.s.a.r.l.SimpleMessageListenerContainer - Closing channel for unresponsive consumer: Consumer@77416991: tags=[[amq.ctag-E62UisbYdAAOQIM2bWr08w]], channel=Cached Rabbit Channel: AMQChannel(amqp://usergate_tst@10.64.177.12:5672/,35), conn: Proxy@6c60c170 Shared Rabbit Connection: SimpleConnection@5fb65b3a [delegate=amqp://usergate_tst@10.64.177.12:5672/, localPort= 59801], acknowledgeMode=MANUAL local queue size=0
我该如何解决这种情况?
继续提问。
我更正了这样的代码:
@Override
public void onMessage(Message message, Channel channel) throws IOException, InterruptedException {
ClubProNotAvailableRabbit data = null;
try {
data = OBJECT_MAPPER.readValue(message.getBody(), ClubProNotAvailableRabbit.class);
MDC.put(data.getRequestContextRabbit().getRequestId(), UUID.randomUUID().toString());
requestContextService.put(createRequestContext(data.getRequestContextRabbit(), data.getRequestContextRabbit().getFront()));
methodCall(data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (ClubProNotAvailableException e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
Thread.sleep(20000);
}
}
Thread.sleep来这里做实验
我希望当我在 rabbitmq 管理控制台中从队列中抓取一条消息时,我会看到它进入 Unacked 状态,这就是它发生的方式。
然后,当发生错误时,我调用了 basicReject 方法,我希望状态在 basicReject 调用行之后立即变为就绪,但是一旦该方法完全完成它就变为就绪。
未确认状态:
虽然baseReject方法已经起作用了。
为什么会这样?它应该如何工作以及什么机制?为什么在调用 baseReject 方法后消息没有立即就绪(控制台 rabbit 中的状态)?
Closing channel for unresponsive consumer:
这意味着侦听器“卡在”您的代码中 - 您无法从侦听器本身调用 stop()
- container.stop()
等待侦听器退出。您应该改用 stop(() -> log.info("stopped container"))
。
您需要 basicReject
在 catch 案例中 - 容器不会为您处理 MANUAL acks。
如果您 ack/nack 自己发送消息,则必须使用手动确认。
通常让容器负责确认您的消息会更好。