Spring 使用 StompBrokerRelay (RabbitMQ) 时关闭期间的 Websocket 异常

Spring Websocket exception during close when using StompBrokerRelay (RabbitMQ)

将 RabbitMQ 与 Spring Websockets 一起使用时,在 Websocket 关闭期间通过 Reactor 发生如下异常:

2015-03-30 11:37:23.647 [reactor-tcp-io-1] DEBUG o.s.m.s.s.StompBrokerRelayMessageHandler - Failure while clearing TCP connection state in session 39cb9vbm
java.util.NoSuchElementException: null
    at com.gs.collections.impl.list.mutable.MutableIterator.next(MutableIterator.java:57) ~[gs-collections-5.1.0.jar:na]
    at com.gs.collections.impl.list.mutable.MultiReaderFastList$UntouchableListIterator.next(MultiReaderFastList.java:1267) ~[gs-collections-5.1.0.jar:na]
    at reactor.event.registry.CachingRegistry.value(CachingRegistry.java:70) ~[reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.registry.CachingRegistry.value(CachingRegistry.java:67) ~[reactor-core-1.1.5.RELEASE.jar:na]
    at com.gs.collections.impl.list.mutable.MultiReaderFastList.withWriteLockAndDelegate(MultiReaderFastList.java:179) ~[gs-collections-5.1.0.jar:na]
    at reactor.event.registry.CachingRegistry.unregister(CachingRegistry.java:67) ~[reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.net.AbstractNetChannel.close(AbstractNetChannel.java:185) [reactor-net-1.1.5.RELEASE.jar:na]
    at org.springframework.messaging.tcp.reactor.Reactor11TcpConnection.close(Reactor11TcpConnection.java:63) ~[spring-messaging-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$StompConnectionHandler.clearConnection(StompBrokerRelayMessageHandler.java:808) [spring-messaging-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$StompConnectionHandler.afterDisconnectSent(StompBrokerRelayMessageHandler.java:777) [spring-messaging-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$StompConnectionHandler.access00(StompBrokerRelayMessageHandler.java:496) [spring-messaging-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$StompConnectionHandler.onSuccess(StompBrokerRelayMessageHandler.java:749) [spring-messaging-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$StompConnectionHandler.onSuccess(StompBrokerRelayMessageHandler.java:745) [spring-messaging-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.util.concurrent.ListenableFutureCallbackRegistry.success(ListenableFutureCallbackRegistry.java:119) [spring-core-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.messaging.tcp.reactor.AbstractPromiseToListenableFutureAdapter.accept(AbstractPromiseToListenableFutureAdapter.java:57) [spring-messaging-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at reactor.core.action.CallbackAction.doAccept(CallbackAction.java:36) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.action.Action.accept(Action.java:52) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.action.Action.accept(Action.java:32) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.routing.ArgumentConvertingConsumerInvoker.invoke(ArgumentConvertingConsumerInvoker.java:73) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.routing.ConsumerFilteringEventRouter.route(ConsumerFilteringEventRouter.java:78) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.dispatch.SynchronousDispatcher.dispatch(SynchronousDispatcher.java:75) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.Reactor.notify(Reactor.java:242) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.Reactor.notify(Reactor.java:249) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.Reactor.notify(Reactor.java:57) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.composable.Composable.notifyValue(Composable.java:494) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.composable.Promise.notifyValue(Promise.java:571) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.composable.Deferred.acceptEvent(Deferred.java:118) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.composable.Deferred.accept(Deferred.java:83) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.net.netty.NettyNetChannel.operationComplete(NettyNetChannel.java:101) [reactor-net-1.1.5.RELEASE.jar:na]
    at reactor.net.netty.NettyNetChannel.operationComplete(NettyNetChannel.java:89) [reactor-net-1.1.5.RELEASE.jar:na]
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at io.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:624) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:144) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at reactor.net.netty.NettyNetChannel.write(NettyNetChannel.java:89) [reactor-net-1.1.5.RELEASE.jar:na]
    at reactor.net.netty.NettyNetChannel.write(NettyNetChannel.java:83) [reactor-net-1.1.5.RELEASE.jar:na]
    at reactor.net.AbstractNetChannel.write(AbstractNetChannel.java:239) [reactor-net-1.1.5.RELEASE.jar:na]
    at reactor.net.AbstractNetChannel$WriteConsumer.accept(AbstractNetChannel.java:308) [reactor-net-1.1.5.RELEASE.jar:na]
    at reactor.core.Reactor.accept(Reactor.java:373) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.Reactor.accept(Reactor.java:370) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.routing.ArgumentConvertingConsumerInvoker.invoke(ArgumentConvertingConsumerInvoker.java:73) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.routing.ConsumerFilteringEventRouter.route(ConsumerFilteringEventRouter.java:103) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.dispatch.AbstractLifecycleDispatcher.route(AbstractLifecycleDispatcher.java:64) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.dispatch.AbstractMultiThreadDispatcher$MultiThreadTask.run(AbstractMultiThreadDispatcher.java:55) [reactor-core-1.1.5.RELEASE.jar:na]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:370) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]

如果使用 SimpleBroker 则不会出现此异常。

我们正在使用以下版本:

我们的配置非常简单:

public class WebSocketConfig extends WebSocketMessageBrokerConfigurationSupport {
    @Autowired
    private SessionRepository sessionRepository;

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Value("${rabbitmq.enabled=true}")
    private boolean rabbitMqEnabled;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp")
                .setHandshakeHandler(new DefaultHandshakeHandler(new TomcatRequestUpgradeStrategy()))
                .addInterceptors(sessionRepositoryInterceptor())
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        if (rabbitMqEnabled) {
            registry.enableStompBrokerRelay("/topic", "/queue")
                    .setClientLogin("....").setClientPasscode("....")
                    .setSystemLogin("....").setSystemPasscode("....")
                    .setRelayHost("....").setRelayPort(....)
                    .setVirtualHost("....");
        } else {
            registry.enableSimpleBroker("/topic", "/queue");
        }

        registry.setApplicationDestinationPrefixes("/ws");
    }

    ...
}

有什么想法或指导吗?

I've also created a bug ticket for tracking

已被 Spring 开发人员标记为预期行为。

对于遇到此问题的人,将 StompBrokerRelayMessageHandler 的日志记录调低为 INFO 之类的内容可能会有所帮助。

对于登录:

<logger name="org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler" level="INFO"/>