Java RabbitMQ 连接已经关闭

Java RabbitMQ connection is already closed

我需要将消息推送到外部rabbitmq。我的 java 配置成功声明要推送的队列,但每次我尝试推送时,都会出现下一个异常:

web_1       | com.rabbitmq.client.AlreadyClosedException: connection is already closed due to clean connection shutdown; protocol method: #method<connection.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
web_1       |   at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
web_1       |   at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
web_1       |   at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:710)
web_1       |   at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:685)
web_1       |   at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:675)
web_1       |   at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:207)
web_1       |   at com.ruddi.logiweb.service.impl.MQServiceImpl.send(MQServiceImpl.java:33)
web_1       |   at com.ruddi.logiweb.controller.DriverController.addDriver(DriverController.java:105)
web_1       |   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
web_1       |   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
web_1       |   at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
web_1       |   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
web_1       |   at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)
web_1       |   at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141)
web_1       |   at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106)
web_1       |   at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894)
web_1       |   at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
web_1       |   at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
web_1       |   at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060)
web_1       |   at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962)
web_1       |   at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
web_1       |   at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
web_1       |   at javax.servlet.http.HttpServlet.service(HttpServlet.java:652)
web_1       |   at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
web_1       |   at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1       |   at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:320)
web_1       |   at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
web_1       |   at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:118)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:158)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.header.HeaderWriterFilter.doHeadersAfter(HeaderWriterFilter.java:92)
web_1       |   at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:77)
web_1       |   at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
web_1       |   at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
web_1       |   at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1       |   at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)
web_1       |   at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)
web_1       |   at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:358)
web_1       |   at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:271)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
web_1       |   at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1       |   at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
web_1       |   at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
web_1       |   at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
web_1       |   at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
web_1       |   at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
web_1       |   at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:687)
web_1       |   at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
web_1       |   at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)
web_1       |   at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
web_1       |   at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
web_1       |   at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)
web_1       |   at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707)
web_1       |   at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
web_1       |   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
web_1       |   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
web_1       |   at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
web_1       |   at java.base/java.lang.Thread.run(Thread.java:829)

豆子:

@Bean("mqChannel")
    public Channel channel() {
        ConnectionFactory factory = new ConnectionFactory();

        String uri = "amqp://sxtswmgm:dh1N5aBEUnam53urt2VMrF9HSi7IDWAf@stingray.rmq.cloudamqp.com/sxtswmgm";

        try {
            factory.setUri(uri);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
//        factory.setHost("rabbitmq");
//        factory.setPort(5672);
//        factory.setUsername("guest");
//        factory.setPassword("guest");

        Channel channel = null;

        try (Connection connection = factory.newConnection()){
            channel = connection.createChannel();
            channel.queueDeclare("logiweb", false, false, false, null);
        }
        catch(Exception e) {
            log.info("EXCEPTION IN BEAN");
            e.printStackTrace();
        }

        return channel;
    }

发送消息服务class:

@Service
@Slf4j
public class MQServiceImpl implements MQService {
    Channel channel;

    @Autowired
    public MQServiceImpl(
            @Qualifier("mqChannel") Channel channel) {
        this.channel = channel;
    }

    /**
     * sending message to mq
     * @param message message to send
     */
    @Override
    public void send(String message) {
        log.info("send(message) method was called");
        try {
            channel.basicPublish("", "logiweb", null, message.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

顺便说一下,当我在本地启动 rabbitmq 时,它工作得很好。当我使用容器化的 rabbitmq 或外部 cloudamqp

时出现此问题

所以,再一次,当我 运行 我的应用程序时,我可以在 RabbitMQ 仪表板中看到新队列(已声明),但每次推送消息的尝试都以异常结束。

我很难理解该代码是如何组合在一起的,但我认为这部分绝对是错误的:

    try (Connection connection = factory.newConnection()){
        channel = connection.createChannel();
        channel.queueDeclare("logiweb", false, false, false, null);
    }

try 块完成时,connection 资源将自动关闭。但是 Connection.close()javadoc 声明:

"Close this connection and all its channels with the AMQP.REPLY_SUCCESS close code and message 'OK'. Waits for all the close operations to complete."

因此,您创建的 Channel 将在 channel() 方法 returns 之前关闭。