Spring Messaging/ActiveMQ 未收到已确认消息
Spring Messaging/ActiveMQ Acknowledged Message not received
我已经研究这个问题一段时间了,决定寻求帮助。
我有以下情况。 ActiveMQ 服务器侦听端口 61614。两个 WebSocketStompClient 连接到以下队列;
客户 1:
/queue/request/server1 - /queue/replyto/server1
客户2:
/queue/request/server2 - /queue/replyto/server2
2台服务器相互通信和请求信息。
我对这些场景没有问题。
将请求从服务器 1 发送到服务器 2 队列并在服务器 1 响应队列上接收响应。
有点像这样:I don't have 10 reputations...
Sending SEND {destination=[/queue/request/server2], session=[0d2573e2-079e-ad9c-71df-9274eeba2519], receipt=[3]} etc..
Received MESSAGE {destination=[/queue/request/server2], session=[0d2573e2-079e-ad9c-71df-9274eeba2519] etc...
此处执行的随机应用程序逻辑...
Sending SEND {destination=[/queue/replyto/server1] ,session=[0d2573e2-079e-ad9c-71df-9274eeba2519], etc...
Received MESSAGE {destination=[/queue/replyto/server1], session=[0d2573e2-079e-ad9c-71df-9274eeba2519], etc...
但是,如果您尝试在响应第一个请求之前向服务器 1 的请求队列发送另一条消息,则会出现问题。响应被发送到服务器 2 的响应队列,但从未收到。
图片在这里:Never receive the response( 3. RESPONSE) to the request (2. REQUEST)
Sending SEND { destination=[/queue/replyto/server2], session=[c504b2fe-ae63-1bc2-87ce-651682b7c98e], receipt=[4], etc.
Received RECEIPT {receipt-id=[4]} session=c256762b-ddef-4109-8a3e-04bde832ed85
我希望它足够清楚,如果需要更多解释,请告诉我。
其他信息是,我确定消息已按照写入队列的方式发送,并且我可以看到。
Queues
如您所见,所有消息都是dequeued/acknowledged。
值得一提的是,所有这些都是通过 docker ActiveMQ 服务器和模拟开发服务器上发生的单元测试在本地完成的。
使用 Wireshark,我可以看到消息已被 server2 确认。
send frame
tcp trace
最后:
这是两个服务器的设置,它是相同的:
客户:
WebSocketClient transport = new StandardWebSocketClient();
WebSocketStompClient server1 = new WebSocketStompClient(transport);
server1.setMessageConverter(new MappingJackson2MessageConverter());
server1.setTaskScheduler(taskScheduler);
server1.setDefaultHeartbeat(heartbeat);
server1.connect(bindAddress, Server1SessionHandler);
处理程序:
public class Server1SessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(final StompSession session, final StompHeaders connectedHeaders) {
logger.debug("Entering RemoteSessionHandler after connected method");
this.stompSession = session;
if (session.isConnected()) {
session.setAutoReceipt(true);
logger.trace("Attempting to subscribe to channel {} using the RequestFrameHandler ", this.subscribeChannel);
session.subscribe(this.subscribeChannel, new Server1RequestFrameHandler(session, null, brokerMessageTtl, logicService, guid));
logger.trace("Attempting to subscribe to channel {} using the ResponseFrameHandler ", this.replyQueue);
session.subscribe(this.replyQueue, new Server1ResponseFrameHandler(this.cache));
publisher.publishEvent(new ConnectionSuccessEvent(Server1SessionHandler.class, "RemoteSessionHandler"));
}
else {
logger.error("Could not connect to stomp Session {}", session.toString());
}
}
}
请求帧处理程序:
public class Server1RequestFrameHandler implements StompFrameHandler {
private StompSession session;
public Server1RequestFrameHandler(final StompSession session) {
this.session = session;
}
@Override
@SuppressWarnings("static-access")
public void handleFrame(final StompHeaders headers, final Object payload) {
...... BUSINESS LOGIC ......
if (session.isConnected()) {
session.send(header, response);
logger.debug("Successfully connected using the stompsession {}, ", session.toString());
}
}
@Override
public Type getPayloadType(final StompHeaders headers) {
return Request.class;
}
}
响应帧处理程序:
public class Server1ResponseFrameHandler implements StompFrameHandler {
private Server1Cache cache;
public Server1ResponseFrameHandler(final Server1Cache cache) {
this.cache = cache;
}
public void handleFrame(final StompHeaders headers, final Object payload) {
Response response = (Response) payload;
logger.debug("response: {}", response);
cache.cacheMessage(id, response);
}
public Type getPayloadType(final StompHeaders headers) {
return Response.class;
}
}
如果您需要更多信息,请告诉我。
我可以通过使用 2 个不同的 WebSocket 连接设置订阅主题来绕过这个问题。如:
创建请求客户端
WebSocketClient requestTransport = new StandardWebSocketClient();
WebSocketStompClient requestClient = new WebSocketStompClient(requestTransport);
requestClient.setMessageConverter(new MappingJackson2MessageConverter());
requestClient.setTaskScheduler(taskScheduler);
requestClient.setDefaultHeartbeat(heartbeat);
requestClient.connect(bindAddress, RequestSessionHandler);
创建单独的响应客户端
WebSocketClient responseTransport = new StandardWebSocketClient();
WebSocketStompClient responseClient = new WebSocketStompClient(responseTransport);
responseClient.setMessageConverter(new MappingJackson2MessageConverter());
responseClient.setTaskScheduler(taskScheduler);
responseClient.setDefaultHeartbeat(heartbeat);
responseClient.connect(bindAddress, ResponseSessionHandler);
其中请求和响应会话处理程序仅订阅 1 个队列而不是两者。
public class RequestSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(final StompSession session, final StompHeaders connectedHeaders) {
...
if (session.isConnected()) {
session.subscribe("requestChannel", new Server1RequestFrameHandler(session, null, brokerMessageTtl, logicService, guid));
....
}
}
}
如果有人解释为什么仅使用 1 个 WebSocket 客户端订阅两个队列时会发生这种情况,我很想听听。
我的理论是,为 websocket 连接提供服务的线程正忙于请求,无法处理发送的响应。
我已经研究这个问题一段时间了,决定寻求帮助。
我有以下情况。 ActiveMQ 服务器侦听端口 61614。两个 WebSocketStompClient 连接到以下队列;
客户 1: /queue/request/server1 - /queue/replyto/server1
客户2: /queue/request/server2 - /queue/replyto/server2
2台服务器相互通信和请求信息。 我对这些场景没有问题。 将请求从服务器 1 发送到服务器 2 队列并在服务器 1 响应队列上接收响应。
有点像这样:I don't have 10 reputations...
Sending SEND {destination=[/queue/request/server2], session=[0d2573e2-079e-ad9c-71df-9274eeba2519], receipt=[3]} etc..
Received MESSAGE {destination=[/queue/request/server2], session=[0d2573e2-079e-ad9c-71df-9274eeba2519] etc...
此处执行的随机应用程序逻辑...
Sending SEND {destination=[/queue/replyto/server1] ,session=[0d2573e2-079e-ad9c-71df-9274eeba2519], etc...
Received MESSAGE {destination=[/queue/replyto/server1], session=[0d2573e2-079e-ad9c-71df-9274eeba2519], etc...
但是,如果您尝试在响应第一个请求之前向服务器 1 的请求队列发送另一条消息,则会出现问题。响应被发送到服务器 2 的响应队列,但从未收到。
图片在这里:Never receive the response( 3. RESPONSE) to the request (2. REQUEST)
Sending SEND { destination=[/queue/replyto/server2], session=[c504b2fe-ae63-1bc2-87ce-651682b7c98e], receipt=[4], etc.
Received RECEIPT {receipt-id=[4]} session=c256762b-ddef-4109-8a3e-04bde832ed85
我希望它足够清楚,如果需要更多解释,请告诉我。
其他信息是,我确定消息已按照写入队列的方式发送,并且我可以看到。
Queues
如您所见,所有消息都是dequeued/acknowledged。
值得一提的是,所有这些都是通过 docker ActiveMQ 服务器和模拟开发服务器上发生的单元测试在本地完成的。
使用 Wireshark,我可以看到消息已被 server2 确认。
send frame
tcp trace
最后:
这是两个服务器的设置,它是相同的:
客户:
WebSocketClient transport = new StandardWebSocketClient();
WebSocketStompClient server1 = new WebSocketStompClient(transport);
server1.setMessageConverter(new MappingJackson2MessageConverter());
server1.setTaskScheduler(taskScheduler);
server1.setDefaultHeartbeat(heartbeat);
server1.connect(bindAddress, Server1SessionHandler);
处理程序:
public class Server1SessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(final StompSession session, final StompHeaders connectedHeaders) {
logger.debug("Entering RemoteSessionHandler after connected method");
this.stompSession = session;
if (session.isConnected()) {
session.setAutoReceipt(true);
logger.trace("Attempting to subscribe to channel {} using the RequestFrameHandler ", this.subscribeChannel);
session.subscribe(this.subscribeChannel, new Server1RequestFrameHandler(session, null, brokerMessageTtl, logicService, guid));
logger.trace("Attempting to subscribe to channel {} using the ResponseFrameHandler ", this.replyQueue);
session.subscribe(this.replyQueue, new Server1ResponseFrameHandler(this.cache));
publisher.publishEvent(new ConnectionSuccessEvent(Server1SessionHandler.class, "RemoteSessionHandler"));
}
else {
logger.error("Could not connect to stomp Session {}", session.toString());
}
}
}
请求帧处理程序:
public class Server1RequestFrameHandler implements StompFrameHandler {
private StompSession session;
public Server1RequestFrameHandler(final StompSession session) {
this.session = session;
}
@Override
@SuppressWarnings("static-access")
public void handleFrame(final StompHeaders headers, final Object payload) {
...... BUSINESS LOGIC ......
if (session.isConnected()) {
session.send(header, response);
logger.debug("Successfully connected using the stompsession {}, ", session.toString());
}
}
@Override
public Type getPayloadType(final StompHeaders headers) {
return Request.class;
}
}
响应帧处理程序:
public class Server1ResponseFrameHandler implements StompFrameHandler {
private Server1Cache cache;
public Server1ResponseFrameHandler(final Server1Cache cache) {
this.cache = cache;
}
public void handleFrame(final StompHeaders headers, final Object payload) {
Response response = (Response) payload;
logger.debug("response: {}", response);
cache.cacheMessage(id, response);
}
public Type getPayloadType(final StompHeaders headers) {
return Response.class;
}
}
如果您需要更多信息,请告诉我。
我可以通过使用 2 个不同的 WebSocket 连接设置订阅主题来绕过这个问题。如: 创建请求客户端
WebSocketClient requestTransport = new StandardWebSocketClient();
WebSocketStompClient requestClient = new WebSocketStompClient(requestTransport);
requestClient.setMessageConverter(new MappingJackson2MessageConverter());
requestClient.setTaskScheduler(taskScheduler);
requestClient.setDefaultHeartbeat(heartbeat);
requestClient.connect(bindAddress, RequestSessionHandler);
创建单独的响应客户端
WebSocketClient responseTransport = new StandardWebSocketClient();
WebSocketStompClient responseClient = new WebSocketStompClient(responseTransport);
responseClient.setMessageConverter(new MappingJackson2MessageConverter());
responseClient.setTaskScheduler(taskScheduler);
responseClient.setDefaultHeartbeat(heartbeat);
responseClient.connect(bindAddress, ResponseSessionHandler);
其中请求和响应会话处理程序仅订阅 1 个队列而不是两者。
public class RequestSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(final StompSession session, final StompHeaders connectedHeaders) {
...
if (session.isConnected()) {
session.subscribe("requestChannel", new Server1RequestFrameHandler(session, null, brokerMessageTtl, logicService, guid));
....
}
}
}
如果有人解释为什么仅使用 1 个 WebSocket 客户端订阅两个队列时会发生这种情况,我很想听听。
我的理论是,为 websocket 连接提供服务的线程正忙于请求,无法处理发送的响应。