添加 STOMP header 而不在 ChannelInterceptorAdapter 上重新创建消息

Add STOMP header without recreating Message on ChannelInterceptorAdapter

我需要将 header 添加到 STOMP 消息中,目前它的工作方式如下,但我正在重新创建消息,是否可以只添加原生 header 而不必重新创建消息性能 .

public class MyChannelInterceptor extends ChannelInterceptorAdapter {


    @Override
      public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);

        StompCommand command = accessor.getCommand();
        if(command != null) {
            log.debug("Receiving msg {} from {}",command,accessor.getUser().getName());
            if(command == StompCommand.SEND) {
                log.debug("Adding expires header to  msg {} from {}",command,accessor.getUser().getName());
                String ttlString = accessor.getFirstNativeHeader("ttl");
                long ttl = 30000;
                try {
                    ttl = Long.parseLong(ttlString);
                } 
                catch(Exception ex) {
                    log.error("TTL header received but not in correct format {}",ttlString);
                }
                accessor.addNativeHeader("expires", Long.toString(System.currentTimeMillis() + ttl));

                return MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
            }
        }
        return message;
      }

}

由于 addNativeHeader 成功,这表明消息仍然可变 - 请参阅 addNativeHeader()

无论如何,由于 NATIVE_HEADERS 消息 header 是 MultiValueMap 值 header,您可以更新 header 内容 in-place.

因此,无需创建新消息。

如果您将新的 header 添加到消息本身(而不是更新现有 header).

编辑

我只是运行一个测试;只要消息仍然可变,您就可以更改它...

@Test
public void test() {
    Map<String, Object> map = new HashMap<String, Object>();
    MutableMessageHeaders headers = new MutableMessageHeaders(map);
    Message<String> message = MessageBuilder.createMessage("foo", headers);
    StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
    accessor.addNativeHeader("foo", "bar");
    System.out.println(message.getHeaders().get(NativeMessageHeaderAccessor.NATIVE_HEADERS));
    accessor.setImmutable();
    try {
        accessor.addNativeHeader("baz", "qux");
        fail("expected IllegalStateException");
    }
    catch (IllegalStateException e) {

    }
}

也就是说,您遇到的是性能问题还是只是一个感知到的问题。创建消息并不昂贵。

这就是我要找的

StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

以上代码将获取消息的实际 StompHeaderAccessor,因此如果您操作本机 headers 它们会直接反映在消息上,而

StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);

将获得 headers 的克隆,您必须使用新克隆的 headers

创建一条新消息

完整固定代码如下

@Override
      public Message<?> preSend(Message<?> message, MessageChannel channel) {

        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
       // StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        if(accessor != null) {
            StompCommand command = accessor.getCommand();
            if(command != null) {
                log.debug("Receiving msg {} from {}",command,accessor.getUser().getName());
                if(command == StompCommand.SEND) {

                    log.debug("Adding expires header to  msg {} from {}",command,accessor.getUser().getName());
                    String ttlString = accessor.getFirstNativeHeader("ttl");
                    long ttl = 30000;
                    if(ttlString != null) {
                        try {
                            ttl = Long.parseLong(ttlString);
                        } 
                        catch(Exception ex) {
                            log.error("TTL header received but not in correct format {}",ttlString);
                        }
                    }

                    accessor.addNativeHeader("expires", Long.toString(System.currentTimeMillis() + ttl));
                     // I don't need any more to create a new message
                    //return MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
                }
            }
        }
        return message;
      }