websocket 输出适配器的动态注册不起作用
Dynamic registration of websocket output adapter is not working
我正在尝试实现 ServerWebSocketContainer 的动态注册,以便我可以在运行时将消息发布到不同的 websocket 端点。
这是服务器端代码
@Service
public class WebSocketPublisherService {
@Autowired
IntegrationFlowContext integrationFlowContext;
@Bean
public HandshakeHandler handshakeHandler() {
return new DefaultHandshakeHandler();
}
@Autowired
RegularPublisher regularPublisher;
public void startPublishing(String name) {
ServerWebSocketContainer serverWebSocketContainer = new ServerWebSocketContainer(name)
.setHandshakeHandler(handshakeHandler());
MethodInvokingMessageSource methodInvokingMessageSource = new MethodInvokingMessageSource();
methodInvokingMessageSource.setObject(regularPublisher);
methodInvokingMessageSource.setMethodName("publishEmergency");
WebSocketOutboundMessageHandler webSocketOutboundMessageHandler = new WebSocketOutboundMessageHandler(serverWebSocketContainer);
webSocketOutboundMessageHandler.afterPropertiesSet();
StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(methodInvokingMessageSource, polling -> polling.poller(pollerFactory -> pollerFactory.fixedRate(10000)))
.split(new CustomMesssageSplitter(serverWebSocketContainer))
.handle(webSocketOutboundMessageHandler)
.get();
integrationFlowContext.registration(standardIntegrationFlow)
.addBean(serverWebSocketContainer)
.register();
standardIntegrationFlow.start();
}
}
@Component
public class RegularPublisher {
public String publishEmergency() {
System.out.println("publishing message");
return "This is Message from Presidenet!";
}
}
这是我的客户端代码。我正在使用来自 com.squareup.okhttp3 groupId
的 okhttp 3.5.0
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(url).build();
StringListner listener = new StringListner();
WebSocket ws = client.newWebSocket(request, listener);
// Trigger shutdown of the dispatcher's executor so this process can
// exit cleanly.
client.dispatcher().executorService().shutdown();
这是监听器代码
public class StringListner extends WebSocketListener {
private static final int NORMAL_CLOSURE_STATUS = 1000;
@Override
public void onOpen(WebSocket webSocket, Response response) {
System.out.println("Connection opened");
}
@Override
public void onMessage(WebSocket webSocket, String text) {
System.out.println("Receiving: " + text);
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
System.out.println("Receiving: " + bytes.hex());
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
webSocket.close(NORMAL_CLOSURE_STATUS, null);
System.out.println("Closing: " + code + " " + reason);
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
t.printStackTrace();
}
}
动态注册主题为:/examples
客户端通过URL: ws://localhost:8080/examples
我在从客户端连接到 websocket 时遇到此错误。
java.net.ProtocolException: Expected HTTP 101 response but was '404 '
at okhttp3.internal.ws.RealWebSocket.checkResponse(RealWebSocket.java:215)
at okhttp3.internal.ws.RealWebSocket.onResponse(RealWebSocket.java:182)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
运行时服务器套接字端点注册仅从 Spring 集成 5.5
版本开始可用:https://docs.spring.io/spring-integration/docs/current/reference/html/whats-new.html#x5.5-websocket.
更新
框架确实有问题,当我们不仅有简单的 Spring 集成,还有整个 Spring 引导。我将以某种方式修复它,但现在这里有一个解决方法:
只使用一个@SpringBootApplication
。它为我们带来了@EnableIntegration
.
不要使用 @EnableWebSocket
,因为它会覆盖 Spring Integration 想要做的任何事情。换句话说 @EnableWebSocket
与 Spring 集成 Websocket 支持不兼容。 (或者相反)
您必须将此 bean 放入您的应用程序上下文中:
@Bean
public static BeanPostProcessor integrationDynamicWebSocketHandlerMappingWorkaround() {
return new BeanPostProcessor() {
@Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean.getClass().getSimpleName().equals("IntegrationDynamicWebSocketHandlerMapping")) {
((AbstractHandlerMapping) bean).setOrder(0);
}
return bean;
}
};
}
问题是 IntegrationDynamicWebSocketHandler
带有默认值 order
并且它已经在默认值 Spring 之后添加 Boot 的 SimpleUrlHandlerMapping
当没有请求的映射时会很快失败 URL.
我正在尝试实现 ServerWebSocketContainer 的动态注册,以便我可以在运行时将消息发布到不同的 websocket 端点。
这是服务器端代码
@Service
public class WebSocketPublisherService {
@Autowired
IntegrationFlowContext integrationFlowContext;
@Bean
public HandshakeHandler handshakeHandler() {
return new DefaultHandshakeHandler();
}
@Autowired
RegularPublisher regularPublisher;
public void startPublishing(String name) {
ServerWebSocketContainer serverWebSocketContainer = new ServerWebSocketContainer(name)
.setHandshakeHandler(handshakeHandler());
MethodInvokingMessageSource methodInvokingMessageSource = new MethodInvokingMessageSource();
methodInvokingMessageSource.setObject(regularPublisher);
methodInvokingMessageSource.setMethodName("publishEmergency");
WebSocketOutboundMessageHandler webSocketOutboundMessageHandler = new WebSocketOutboundMessageHandler(serverWebSocketContainer);
webSocketOutboundMessageHandler.afterPropertiesSet();
StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(methodInvokingMessageSource, polling -> polling.poller(pollerFactory -> pollerFactory.fixedRate(10000)))
.split(new CustomMesssageSplitter(serverWebSocketContainer))
.handle(webSocketOutboundMessageHandler)
.get();
integrationFlowContext.registration(standardIntegrationFlow)
.addBean(serverWebSocketContainer)
.register();
standardIntegrationFlow.start();
}
}
@Component
public class RegularPublisher {
public String publishEmergency() {
System.out.println("publishing message");
return "This is Message from Presidenet!";
}
}
这是我的客户端代码。我正在使用来自 com.squareup.okhttp3 groupId
的 okhttp 3.5.0OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(url).build();
StringListner listener = new StringListner();
WebSocket ws = client.newWebSocket(request, listener);
// Trigger shutdown of the dispatcher's executor so this process can
// exit cleanly.
client.dispatcher().executorService().shutdown();
这是监听器代码
public class StringListner extends WebSocketListener {
private static final int NORMAL_CLOSURE_STATUS = 1000;
@Override
public void onOpen(WebSocket webSocket, Response response) {
System.out.println("Connection opened");
}
@Override
public void onMessage(WebSocket webSocket, String text) {
System.out.println("Receiving: " + text);
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
System.out.println("Receiving: " + bytes.hex());
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
webSocket.close(NORMAL_CLOSURE_STATUS, null);
System.out.println("Closing: " + code + " " + reason);
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
t.printStackTrace();
}
}
动态注册主题为:/examples
客户端通过URL: ws://localhost:8080/examples
我在从客户端连接到 websocket 时遇到此错误。
java.net.ProtocolException: Expected HTTP 101 response but was '404 '
at okhttp3.internal.ws.RealWebSocket.checkResponse(RealWebSocket.java:215)
at okhttp3.internal.ws.RealWebSocket.onResponse(RealWebSocket.java:182)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
运行时服务器套接字端点注册仅从 Spring 集成 5.5
版本开始可用:https://docs.spring.io/spring-integration/docs/current/reference/html/whats-new.html#x5.5-websocket.
更新
框架确实有问题,当我们不仅有简单的 Spring 集成,还有整个 Spring 引导。我将以某种方式修复它,但现在这里有一个解决方法:
只使用一个
@SpringBootApplication
。它为我们带来了@EnableIntegration
.不要使用
@EnableWebSocket
,因为它会覆盖 Spring Integration 想要做的任何事情。换句话说@EnableWebSocket
与 Spring 集成 Websocket 支持不兼容。 (或者相反)您必须将此 bean 放入您的应用程序上下文中:
@Bean public static BeanPostProcessor integrationDynamicWebSocketHandlerMappingWorkaround() { return new BeanPostProcessor() { @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { if (bean.getClass().getSimpleName().equals("IntegrationDynamicWebSocketHandlerMapping")) { ((AbstractHandlerMapping) bean).setOrder(0); } return bean; } }; }
问题是 IntegrationDynamicWebSocketHandler
带有默认值 order
并且它已经在默认值 Spring 之后添加 Boot 的 SimpleUrlHandlerMapping
当没有请求的映射时会很快失败 URL.