websocket 输出适配器的动态注册不起作用

Dynamic registration of websocket output adapter is not working

我正在尝试实现 ServerWebSocketContainer 的动态注册,以便我可以在运行时将消息发布到不同的 websocket 端点。

这是服务器端代码

@Service
public class WebSocketPublisherService {

    @Autowired
    IntegrationFlowContext integrationFlowContext;

    @Bean
    public HandshakeHandler handshakeHandler() {
        return new DefaultHandshakeHandler();
    }

    @Autowired
    RegularPublisher regularPublisher;

    public void startPublishing(String name) {
        ServerWebSocketContainer serverWebSocketContainer = new ServerWebSocketContainer(name)
                .setHandshakeHandler(handshakeHandler());

        MethodInvokingMessageSource methodInvokingMessageSource = new MethodInvokingMessageSource();
        methodInvokingMessageSource.setObject(regularPublisher);
        methodInvokingMessageSource.setMethodName("publishEmergency");

        WebSocketOutboundMessageHandler webSocketOutboundMessageHandler = new WebSocketOutboundMessageHandler(serverWebSocketContainer);
        webSocketOutboundMessageHandler.afterPropertiesSet();

        StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(methodInvokingMessageSource, polling -> polling.poller(pollerFactory -> pollerFactory.fixedRate(10000)))
                .split(new CustomMesssageSplitter(serverWebSocketContainer))
                .handle(webSocketOutboundMessageHandler)
                .get();

        integrationFlowContext.registration(standardIntegrationFlow)
                .addBean(serverWebSocketContainer)
                .register();

        standardIntegrationFlow.start();

    }
}


@Component
public class RegularPublisher {

    public String publishEmergency() {
        System.out.println("publishing message");
        return "This is Message from Presidenet!";
    }
}

这是我的客户端代码。我正在使用来自 com.squareup.okhttp3 groupId

的 okhttp 3.5.0
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(url).build();
StringListner listener = new StringListner();
WebSocket ws = client.newWebSocket(request, listener);

// Trigger shutdown of the dispatcher's executor so this process can
// exit cleanly.
client.dispatcher().executorService().shutdown();

这是监听器代码

public class StringListner extends WebSocketListener {

    private static final int NORMAL_CLOSURE_STATUS = 1000;

    @Override
    public void onOpen(WebSocket webSocket, Response response) {
        System.out.println("Connection opened");
    }

    @Override
    public void onMessage(WebSocket webSocket, String text) {
        System.out.println("Receiving: " + text);
    }

    @Override
    public void onMessage(WebSocket webSocket, ByteString bytes) {
        System.out.println("Receiving: " + bytes.hex());
    }

    @Override
    public void onClosing(WebSocket webSocket, int code, String reason) {
        webSocket.close(NORMAL_CLOSURE_STATUS, null);
        System.out.println("Closing: " + code + " " + reason);
    }

    @Override
    public void onFailure(WebSocket webSocket, Throwable t, Response response) {
        t.printStackTrace();
    }
}

动态注册主题为:/examples

客户端通过URL: ws://localhost:8080/examples

我在从客户端连接到 websocket 时遇到此错误。

java.net.ProtocolException: Expected HTTP 101 response but was '404 '
    at okhttp3.internal.ws.RealWebSocket.checkResponse(RealWebSocket.java:215)
    at okhttp3.internal.ws.RealWebSocket.onResponse(RealWebSocket.java:182)
    at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
    at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

运行时服务器套接字端点注册仅从 Spring 集成 5.5 版本开始可用:https://docs.spring.io/spring-integration/docs/current/reference/html/whats-new.html#x5.5-websocket.

关于此事的测试用例如下:https://github.com/spring-projects/spring-integration/blob/main/spring-integration-websocket/src/test/java/org/springframework/integration/websocket/dsl/WebSocketDslTests.java#L66

更新

框架确实有问题,当我们不仅有简单的 Spring 集成,还有整个 Spring 引导。我将以某种方式修复它,但现在这里有一个解决方法:

  1. 只使用一个@SpringBootApplication。它为我们带来了@EnableIntegration.

  2. 不要使用 @EnableWebSocket,因为它会覆盖 Spring Integration 想要做的任何事情。换句话说 @EnableWebSocket 与 Spring 集成 Websocket 支持不兼容。 (或者相反)

  3. 您必须将此 bean 放入您的应用程序上下文中:

     @Bean
     public static BeanPostProcessor integrationDynamicWebSocketHandlerMappingWorkaround() {
         return new BeanPostProcessor() {
    
             @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                 if (bean.getClass().getSimpleName().equals("IntegrationDynamicWebSocketHandlerMapping")) {
                     ((AbstractHandlerMapping) bean).setOrder(0);
                 }
                 return bean;
             }
         };
     }
    

问题是 IntegrationDynamicWebSocketHandler 带有默认值 order 并且它已经在默认值 Spring 之后添加 Boot 的 SimpleUrlHandlerMapping 当没有请求的映射时会很快失败 URL.