无法从 Spring 引导连接到 rabbitmq STOMP
Cannot connect to rabbitmq STOMP from Spring boot
我使用了启用了 STOMP 的 RabbitMQ docker 图像。使用以下配置,当我尝试 运行 我的 Spring 启动应用程序时,出现异常。
StackTrace:
2020-11-21 16:03:07.620 INFO 28504 --- [ient-loop-nio-1] o.s.m.s.s.StompBrokerRelayMessageHandler : TCP connection failure in session system: Failed to connect: Connection refused: /127.0.0.1:61613
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:61613
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_242]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714) ~[na:1.8.0_242]
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]
Dockerfile
FROM rabbitmq:3-management
RUN rabbitmq-plugins enable --offline rabbitmq_stomp
EXPOSE 61613
Rabbitmq 容器的日志对我来说没问题。
WebSocketConfig.java 看起来像:
@EnableWebSocketMessageBroker
@Configuration
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-connection")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue")
.setRelayPort(61613)
.setRelayHost("127.0.0.1")
.setClientPasscode("guest")
.setClientLogin("guest");
registry.setApplicationDestinationPrefixes("/ws");
}
}
pom.xml
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
配置有什么问题?谁能帮帮我?
我认为您在为客户端公开 rabbitmq stomp 端口 61613
时犯了一个错误。顺便说一句,我用它适用于我的类似配置进行了测试。
要实现,请查看我在 GitHub 上的演示应用程序或阅读以下详细信息。
Dockerfile
FROM rabbitmq:3-management
RUN rabbitmq-plugins enable --offline rabbitmq_stomp
EXPOSE 15671 15672 61613
服务器实现
消息契约
public class ZbytesMessage {
private String from;
private String text;
...getters and setters...
}
WebSocket 配置
@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/zsockets")
.setAllowedOrigins("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("localhost")
.setRelayPort(61613)
.setClientLogin("guest")
.setClientPasscode("guest");
config.setApplicationDestinationPrefixes("/zbytes");
}
}
网络控制器
@Controller
public class ZbytesController {
private static final Logger LOG = LoggerFactory.getLogger(ZbytesController.class);
@MessageMapping("/hello")
@SendTo("/topic/greetings")
public ZbytesMessage greeting(ZbytesMessage msg) throws Exception {
Thread.sleep(1000); // simulated delay
LOG.info("Received : {} from: {} ", msg.getText(), msg.getFrom());
return msg;
}
}
服务器运行ner
@SpringBootApplication
public class ServerRunner {
public static void main(String[] args) {
SpringApplication.run(ServerRunner.class, args);
}
}
客户端实现
public class HelloClient {
private static final WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
private static final Logger LOG = LoggerFactory.getLogger(HelloClient.class);
public static void main(String[] args) throws Exception {
HelloClient helloClient = new HelloClient();
ListenableFuture<StompSession> f = helloClient.connect();
StompSession stompSession = f.get();
LOG.info("Subscribing to greeting topic using session {}", stompSession);
helloClient.subscribeGreetings(stompSession);
LOG.info("Sending hello message {}", stompSession);
helloClient.sendHello(stompSession);
Thread.sleep(60000);
}
public ListenableFuture<StompSession> connect() {
Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
List<Transport> transports = Collections.singletonList(webSocketTransport);
SockJsClient sockJsClient = new SockJsClient(transports);
sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
String url = "ws://{host}:{port}/zsockets";
return stompClient.connect(url, headers, new MyHandler(), "localhost", 8080);
}
public void subscribeGreetings(StompSession stompSession) {
stompSession.subscribe("/topic/greetings", new StompFrameHandler() {
public Type getPayloadType(StompHeaders stompHeaders) {
return byte[].class;
}
public void handleFrame(StompHeaders stompHeaders, Object o) {
LOG.info("Received greeting {}", new String((byte[]) o));
}
});
}
public void sendHello(StompSession stompSession) {
String jsonHello = "{ \"from\" : \"suraj\", \"text\" : \"Hi zbytes!\" }";
stompSession.send("/zbytes/hello", jsonHello.getBytes());
}
private static class MyHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(StompSession stompSession, StompHeaders stompHeaders) {
LOG.info("Now connected");
}
}
}
到运行
- 构建 docker 映像并 运行 它(不要忘记公开端口 61613)。 (注:我更喜欢docker-compose.yaml)
docker build -t zbytes/rabbitmq .
docker run -p61613:61613 zbytes/rabbitmq
- 运行
ServerRunner
java 主要 class.
- 运行
HelloClient
java 主要 class.
服务器输出
i.g.zbytes.demo.server.ZbytesController : Received : Hi zbytes! from: suraj
客户端输出
Received greeting {"from":"suraj","text":"Hi zbytes!"}
我使用了启用了 STOMP 的 RabbitMQ docker 图像。使用以下配置,当我尝试 运行 我的 Spring 启动应用程序时,出现异常。
StackTrace:
2020-11-21 16:03:07.620 INFO 28504 --- [ient-loop-nio-1] o.s.m.s.s.StompBrokerRelayMessageHandler : TCP connection failure in session system: Failed to connect: Connection refused: /127.0.0.1:61613
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:61613 Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_242] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714) ~[na:1.8.0_242] at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final] at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.51.Final.jar:4.1.51.Final] at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.51.Final.jar:4.1.51.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.51.Final.jar:4.1.51.Final] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]
Dockerfile
FROM rabbitmq:3-management
RUN rabbitmq-plugins enable --offline rabbitmq_stomp
EXPOSE 61613
Rabbitmq 容器的日志对我来说没问题。
WebSocketConfig.java 看起来像:
@EnableWebSocketMessageBroker
@Configuration
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-connection")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue")
.setRelayPort(61613)
.setRelayHost("127.0.0.1")
.setClientPasscode("guest")
.setClientLogin("guest");
registry.setApplicationDestinationPrefixes("/ws");
}
}
pom.xml
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
配置有什么问题?谁能帮帮我?
我认为您在为客户端公开 rabbitmq stomp 端口 61613
时犯了一个错误。顺便说一句,我用它适用于我的类似配置进行了测试。
要实现,请查看我在 GitHub 上的演示应用程序或阅读以下详细信息。
Dockerfile
FROM rabbitmq:3-management
RUN rabbitmq-plugins enable --offline rabbitmq_stomp
EXPOSE 15671 15672 61613
服务器实现
消息契约
public class ZbytesMessage {
private String from;
private String text;
...getters and setters...
}
WebSocket 配置
@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/zsockets")
.setAllowedOrigins("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("localhost")
.setRelayPort(61613)
.setClientLogin("guest")
.setClientPasscode("guest");
config.setApplicationDestinationPrefixes("/zbytes");
}
}
网络控制器
@Controller
public class ZbytesController {
private static final Logger LOG = LoggerFactory.getLogger(ZbytesController.class);
@MessageMapping("/hello")
@SendTo("/topic/greetings")
public ZbytesMessage greeting(ZbytesMessage msg) throws Exception {
Thread.sleep(1000); // simulated delay
LOG.info("Received : {} from: {} ", msg.getText(), msg.getFrom());
return msg;
}
}
服务器运行ner
@SpringBootApplication
public class ServerRunner {
public static void main(String[] args) {
SpringApplication.run(ServerRunner.class, args);
}
}
客户端实现
public class HelloClient {
private static final WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
private static final Logger LOG = LoggerFactory.getLogger(HelloClient.class);
public static void main(String[] args) throws Exception {
HelloClient helloClient = new HelloClient();
ListenableFuture<StompSession> f = helloClient.connect();
StompSession stompSession = f.get();
LOG.info("Subscribing to greeting topic using session {}", stompSession);
helloClient.subscribeGreetings(stompSession);
LOG.info("Sending hello message {}", stompSession);
helloClient.sendHello(stompSession);
Thread.sleep(60000);
}
public ListenableFuture<StompSession> connect() {
Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
List<Transport> transports = Collections.singletonList(webSocketTransport);
SockJsClient sockJsClient = new SockJsClient(transports);
sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
String url = "ws://{host}:{port}/zsockets";
return stompClient.connect(url, headers, new MyHandler(), "localhost", 8080);
}
public void subscribeGreetings(StompSession stompSession) {
stompSession.subscribe("/topic/greetings", new StompFrameHandler() {
public Type getPayloadType(StompHeaders stompHeaders) {
return byte[].class;
}
public void handleFrame(StompHeaders stompHeaders, Object o) {
LOG.info("Received greeting {}", new String((byte[]) o));
}
});
}
public void sendHello(StompSession stompSession) {
String jsonHello = "{ \"from\" : \"suraj\", \"text\" : \"Hi zbytes!\" }";
stompSession.send("/zbytes/hello", jsonHello.getBytes());
}
private static class MyHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(StompSession stompSession, StompHeaders stompHeaders) {
LOG.info("Now connected");
}
}
}
到运行
- 构建 docker 映像并 运行 它(不要忘记公开端口 61613)。 (注:我更喜欢docker-compose.yaml)
docker build -t zbytes/rabbitmq .
docker run -p61613:61613 zbytes/rabbitmq
- 运行
ServerRunner
java 主要 class. - 运行
HelloClient
java 主要 class.
服务器输出
i.g.zbytes.demo.server.ZbytesController : Received : Hi zbytes! from: suraj
客户端输出
Received greeting {"from":"suraj","text":"Hi zbytes!"}