Spring WebSocketMessageBrokerConfigurer 与 Artemis 2.6.3 多播(主题)不工作
Spring WebSocketMessageBrokerConfigurer with Artemis 2.6.3 Multicast (topic) not working
Artemis 多播地址和队列没有预期的行为。
我的想法是为可以拥有多个 websocket 会话(web、android 等)的用户创建组或特定消息。服务器将向 artemis 多播地址发布通知,所有订阅者都应收到通知。在当前场景中,我只是强制用户 'luislaves00' 并创建多个会话。在 artemis 中,我可以看到 2 个消费者(不确定 spring 的 Message Broker Relay 是如何工作的),但消费者的行为就像一个循环而不是发布者-订阅者。使用来自 Spring 的内存代理,它工作正常但不持久,因此当没有订阅者连接时,消息将被丢弃。
这是我正在使用的代码:
客户端部分:
function connect() {
var socket = new SockJS('/notification-websocket');
stompClient = Stomp.over(socket);
var headers = {
// todo: server will handle this logic
'client-id': 'luisalves00',
'durable-subscription-name': 'luisalves00',
'id' : 'luisalves00'
};
stompClient.connect(headers, function(frame) {
setConnected(true);
console.log('Connected: ' + frame);
// todo: server will handle this logic
stompClient.subscribe('/topic/notification/username' + 'luisalves00', function(notification) {
showNotification(JSON.parse(notification.body).content);
}, headers);
});
}
经纪人中继配置:
public void configureMessageBroker(MessageBrokerRegistry config) {
// Artemis ->
// tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true
config.enableStompBrokerRelay("/topic").setRelayHost("127.0.0.1").setRelayPort(61613);
config.setApplicationDestinationPrefixes("/app");
//config.enableSimpleBroker("/topic");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
logger.info("Registering the stomp endpoints.");
registry.addEndpoint("/notification-websocket").setAllowedOrigins("*").withSockJS();
}
服务器虚拟通知生成器:
@Scheduled(fixedDelay = 20000)
public void scheduleTaskWithFixedDelay() {
final Notification message = new Notification(UUID.randomUUID().toString() + " -> " + dateTimeFormatter.format(LocalDateTime.now()));
try {
final String user = "luisalves00";
logger.info("Creating msg={}", message);
final Map<String, Object> headers = new HashMap<>();
headers.put("subscription-id", user);
template.convertAndSend("/topic/notification/username/" + user, message, headers);
} catch (Exception e) {
logger.error("", e);
}
}
当客户端订阅 artemis 使用此参数创建地址和持久队列时:
Addesses:
id=2147496008 name=/topic/notification/group1/ routingType=[MULTICAST] queueCount=1
Queue
id=2147496011
name=group1.group1
address=/topic/notification/group1/
routingType=MULTICAST
durable=true
maxConsumers-1
purgeOnNoConsumers=false
consumerCount=0
要使作品经久耐用,您必须使用:
'client-id': '<some unique identifier for each client>'
'durable-subscription-name': 'tech-news'
对于我的实现,我停止使用持久的,因为通知的想法是在创建时传送(低延迟)。如果消费者未连接,它可以连接到历史数据库以获取他在连接时未收到的旧消息。
如果您真的想让它持久,我建议在服务器端为连接的用户处理 'client-id' 和持久订阅名称。如果用户没有正在进行的会话,则创建一个持久队列。他创建的下一个会话应该是非持久的,因为它们将收到与持久会话相同的消息。如果第一个会话死亡,他仍然会收到其他会话的消息。当所有人都死掉并且他重新连接时,这将再次成为第一个会话,他将收到所有未在持久队列上传递的消息(可能是他已经在非持久队列上收到的一些消息),但他将拥有历史记录所有消息(如前所述,我认为应该以其他方式处理)。
Artemis 多播地址和队列没有预期的行为。 我的想法是为可以拥有多个 websocket 会话(web、android 等)的用户创建组或特定消息。服务器将向 artemis 多播地址发布通知,所有订阅者都应收到通知。在当前场景中,我只是强制用户 'luislaves00' 并创建多个会话。在 artemis 中,我可以看到 2 个消费者(不确定 spring 的 Message Broker Relay 是如何工作的),但消费者的行为就像一个循环而不是发布者-订阅者。使用来自 Spring 的内存代理,它工作正常但不持久,因此当没有订阅者连接时,消息将被丢弃。 这是我正在使用的代码:
客户端部分:
function connect() {
var socket = new SockJS('/notification-websocket');
stompClient = Stomp.over(socket);
var headers = {
// todo: server will handle this logic
'client-id': 'luisalves00',
'durable-subscription-name': 'luisalves00',
'id' : 'luisalves00'
};
stompClient.connect(headers, function(frame) {
setConnected(true);
console.log('Connected: ' + frame);
// todo: server will handle this logic
stompClient.subscribe('/topic/notification/username' + 'luisalves00', function(notification) {
showNotification(JSON.parse(notification.body).content);
}, headers);
});
}
经纪人中继配置:
public void configureMessageBroker(MessageBrokerRegistry config) {
// Artemis ->
// tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true
config.enableStompBrokerRelay("/topic").setRelayHost("127.0.0.1").setRelayPort(61613);
config.setApplicationDestinationPrefixes("/app");
//config.enableSimpleBroker("/topic");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
logger.info("Registering the stomp endpoints.");
registry.addEndpoint("/notification-websocket").setAllowedOrigins("*").withSockJS();
}
服务器虚拟通知生成器:
@Scheduled(fixedDelay = 20000)
public void scheduleTaskWithFixedDelay() {
final Notification message = new Notification(UUID.randomUUID().toString() + " -> " + dateTimeFormatter.format(LocalDateTime.now()));
try {
final String user = "luisalves00";
logger.info("Creating msg={}", message);
final Map<String, Object> headers = new HashMap<>();
headers.put("subscription-id", user);
template.convertAndSend("/topic/notification/username/" + user, message, headers);
} catch (Exception e) {
logger.error("", e);
}
}
当客户端订阅 artemis 使用此参数创建地址和持久队列时:
Addesses:
id=2147496008 name=/topic/notification/group1/ routingType=[MULTICAST] queueCount=1
Queue
id=2147496011
name=group1.group1
address=/topic/notification/group1/
routingType=MULTICAST
durable=true
maxConsumers-1
purgeOnNoConsumers=false
consumerCount=0
要使作品经久耐用,您必须使用:
'client-id': '<some unique identifier for each client>'
'durable-subscription-name': 'tech-news'
对于我的实现,我停止使用持久的,因为通知的想法是在创建时传送(低延迟)。如果消费者未连接,它可以连接到历史数据库以获取他在连接时未收到的旧消息。 如果您真的想让它持久,我建议在服务器端为连接的用户处理 'client-id' 和持久订阅名称。如果用户没有正在进行的会话,则创建一个持久队列。他创建的下一个会话应该是非持久的,因为它们将收到与持久会话相同的消息。如果第一个会话死亡,他仍然会收到其他会话的消息。当所有人都死掉并且他重新连接时,这将再次成为第一个会话,他将收到所有未在持久队列上传递的消息(可能是他已经在非持久队列上收到的一些消息),但他将拥有历史记录所有消息(如前所述,我认为应该以其他方式处理)。