Spring Websocket STOMP:发送接收帧

Spring Websocket STOMP: send RECEIPT frames

我有一个基于 Spring 及其 SimpleBroker 实现的 Websocket-stomp 服务器(不使用外部代理)。

我想启用 STOMP RECEIPT 消息。

我如何配置我的代码来自动发送这些?

在 STOMP 协议的 Spring 集成测试中,我们有以下代码:

    //SimpleBrokerMessageHandler doesn't support RECEIPT frame, hence we emulate it this way
    @Bean
    public ApplicationListener<SessionSubscribeEvent> webSocketEventListener(
            final AbstractSubscribableChannel clientOutboundChannel) {
        return event -> {
            Message<byte[]> message = event.getMessage();
            StompHeaderAccessor stompHeaderAccessor = StompHeaderAccessor.wrap(message);
            if (stompHeaderAccessor.getReceipt() != null) {
                stompHeaderAccessor.setHeader("stompCommand", StompCommand.RECEIPT);
                stompHeaderAccessor.setReceiptId(stompHeaderAccessor.getReceipt());
                clientOutboundChannel.send(
                        MessageBuilder.createMessage(new byte[0], stompHeaderAccessor.getMessageHeaders()));
            }
        };
    }

https://github.com/spring-projects/spring-integration/blob/master/spring-integration-stomp/src/test/java/org/springframework/integration/stomp/inbound/StompInboundChannelAdapterWebSocketIntegrationTests.java

Artem Bilan 提供的答案仅适用于 SUBSCRIBE 框架。这是另一个捕获任何带有收据 header 的传入帧的方法。只有带有@EnableWebSocketMessageBroker注解的class需要扩展,其他classes(比如带有@Controller注解的)保持不变。

</p> <pre><code>import java.util.logging.Level; import java.util.logging.Logger; import java.util.Map; import java.util.List; import java.util.HashMap; import java.util.ArrayList; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.messaging.simp.config.SimpleBrokerRegistration; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.ChannelInterceptorAdapter; import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.MessageChannel; import org.springframework.beans.factory.annotation.Autowired; @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { private static final Logger LOGGER = Logger.getLogger( WebSocketConfig.class.getName() ); private MessageChannel outChannel; @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors( new InboundMessageInterceptor() ); } @Override public void configureClientOutboundChannel(ChannelRegistration registration) { registration.interceptors( new OutboundMessageInterceptor() ); } @Override public void configureMessageBroker(MessageBrokerRegistry config) { // prefixes are application-dependent config.enableSimpleBroker("/topic"); config.setApplicationDestinationPrefixes("/app"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/note"); } class InboundMessageInterceptor extends ChannelInterceptorAdapter { @SuppressWarnings("unchecked") public Message preSend(Message message, MessageChannel channel) { LOGGER.log( Level.SEVERE, "preSend: "+message ); GenericMessage genericMessage = (GenericMessage)message; MessageHeaders headers = genericMessage.getHeaders(); String simpSessionId = (String)headers.get( "simpSessionId" ); if( ( SimpMessageType.MESSAGE.equals( headers.get( "simpMessageType" ) ) && StompCommand.SEND.equals( headers.get( "stompCommand" ) ) ) || ( SimpMessageType.SUBSCRIBE.equals( headers.get( "simpMessageType" ) ) && StompCommand.SUBSCRIBE.equals( headers.get( "stompCommand" ) ) ) && ( simpSessionId != null ) ) { Map<String,List<String>> nativeHeaders = (Map<String,List<String>>)headers.get( "nativeHeaders" ); if( nativeHeaders != null ) { List<String> receiptList = nativeHeaders.get( "receipt" ); if( receiptList != null ) { String rid = (String)receiptList.get(0); LOGGER.log( Level.SEVERE, "receipt requested: "+rid ); sendReceipt( rid, simpSessionId ); } } } return message; } private void sendReceipt( String rid, String simpSessionId ) { if( outChannel != null ) { HashMap<String,Object> rcptHeaders = new HashMap<String,Object>(); rcptHeaders.put( "simpMessageType", SimpMessageType.OTHER ); rcptHeaders.put( "stompCommand", StompCommand.RECEIPT ); rcptHeaders.put( "simpSessionId", simpSessionId ); HashMap<String,List<String>> nativeHeaders = new HashMap<String,List<String>>(); ArrayList<String> receiptElements = new ArrayList<String>(); receiptElements.add( rid ); nativeHeaders.put( "receipt-id", receiptElements ); rcptHeaders.put( "nativeHeaders",nativeHeaders ); GenericMessage<byte[]> rcptMsg = new GenericMessage<byte[]>( new byte[0],new MessageHeaders( rcptHeaders ) ); outChannel.send( rcptMsg ); } else LOGGER.log( Level.SEVERE, "receipt NOT sent" ); } } class OutboundMessageInterceptor extends ChannelInterceptorAdapter { public void postSend(Message message, MessageChannel channel, boolean sent) { LOGGER.log( Level.SEVERE, "postSend: "+message ); outChannel = channel; } } }

的确,它比应该的要复杂得多,而且获取outChannel也不是很优雅。但它有效。 :-)

类似于 的 post 的解决方案,使用单独的 class 来实现侦听器。

@Component
public class SubscribeListener implements ApplicationListener<SessionSubscribeEvent> {
    @Autowired
    AbstractSubscribableChannel clientOutboundChannel;

    @Override
    public void onApplicationEvent(SessionSubscribeEvent event) {
        Message<byte[]> message = event.getMessage();
        StompHeaderAccessor stompHeaderAccessor = StompHeaderAccessor.wrap(message);

        if (stompHeaderAccessor.getReceipt() != null) {
            StompHeaderAccessor receipt = StompHeaderAccessor.create(StompCommand.RECEIPT);
            receipt.setReceiptId(stompHeaderAccessor.getReceipt());    
            receipt.setSessionId(stompHeaderAccessor.getSessionId());
            clientOutboundChannel.send(MessageBuilder.createMessage(new byte[0], receipt.getMessageHeaders()));
        }
    }
}

以上所有发送回执帧的时间都过早。 以下内容可以满足您的需求。

参考:https://github.com/spring-projects/spring-framework/issues/21848

@Configuration
static class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private MessageChannel outChannel;

    @Autowired
    public WebSocketConfig(MessageChannel clientOutboundChannel) {
        this.outChannel = clientOutboundChannel;
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {

        registration.interceptors(new ExecutorChannelInterceptor() {

            @Override
            public void afterMessageHandled(Message<?> inMessage,
                    MessageChannel inChannel, MessageHandler handler, Exception ex) {

                StompHeaderAccessor inAccessor = StompHeaderAccessor.wrap(inMessage);
                String receipt = inAccessor.getReceipt();
                if (StringUtils.isEmpty(receipt)) {
                    return;
                }

                StompHeaderAccessor outAccessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
                outAccessor.setSessionId(inAccessor.getSessionId());
                outAccessor.setReceiptId(receipt);
                outAccessor.setLeaveMutable(true);

                Message<byte[]> outMessage =
                        MessageBuilder.createMessage(new byte[0], outAccessor.getMessageHeaders());

                outChannel.send(outMessage);
            }
        });
    }
} 

这就是我最后做的。

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketMessageBrokerConfigurer
        implements org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer {
        private MessageChannel outChannel;

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {

        registration.interceptors(new ExecutorChannelInterceptor() {

            @Override
            public void afterMessageHandled(Message<?> inMessage,
                                            MessageChannel inChannel, MessageHandler handler, Exception ex) {
                if(outChannel == null) {
                    return;
                }

                StompHeaderAccessor inAccessor = StompHeaderAccessor.wrap(inMessage);
                String receipt = inAccessor.getReceipt();
                if (StringUtils.isEmpty(receipt)) {
                    return;
                }

                StompHeaderAccessor outAccessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
                outAccessor.setSessionId(inAccessor.getSessionId());
                outAccessor.setReceiptId(receipt);
                outAccessor.setLeaveMutable(true);

                Message<byte[]> outMessage =
                    MessageBuilder.createMessage(new byte[0], outAccessor.getMessageHeaders());

                outChannel.send(outMessage);
            }
        });
    }

    public void configureClientOutboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ExecutorChannelInterceptor() {
            @Override
            public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, @Nullable Exception ex) {
                outChannel = channel;
            }
        });
    }
}

另外,对于测试,您必须使用 taskScheduler 配置 stompClient。这样就可以执行处理收据的任务。

stompClient.setTaskScheduler(taskScheduler);

然后需要配置SessionReceiptable对象。例如,在订阅对象上使用 addReceiptTask

        @Override
        public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
            session.setAutoReceipt(true);
            StompSession.Subscription subscription = session.subscribe(topic, this);
            subscription.addReceiptTask(() -> {
                logger.info("Receipt for subscription ID {}", subscription.getSubscriptionId());
                connectedFuture.complete(null);
            });
            logger.info("Stomp Subscription ID is '{}' for topic '{}'.",
                    subscription.getSubscriptionId(), topic);

        }

如您所见,还有一个 connectedFuture,它在处理订阅收据时完成。这允许测试知道何时可以开始向客户端发送数据。