ActiveMQ JMSXGroupID 未按预期工作
ActiveMQ JMSXGroupID does not work as expected
我使用这个脚本在本地启动 ActiveMQ:
docker run -p 61616:61616 -p 61614:61614 -p 8161:8161 -it -v conf:/opt/activemq/conf -v data:/opt/activemq/data rmohr/activemq
它启动 AMQ 版本 5.15.6。我通过 websockets (v1.2) 通过 STOMP 连接 AMQ。
通过我的 "test" queue 中的 Web 控制台,我创建了两条消息,它们都属于同一组 "test_grp"。我启动了两个进程,每个进程都运行相同的逻辑:
- 连接
- 用
activemq.prefetchSize: 1
和 ack: client-individual
订阅“/queue/test” headers
- 在收到新消息时休眠 5 秒并发送消息确认
两个进程都立即接收消息,而第二条消息应该由同一进程接收,或者至少应该在第一个消息被确认后接收。
此外,如果我只启动一个 process/subscription 和 activemq.prefetchSize: 2
header,那么此进程会立即收到两条消息,而不是在第一条消息被确认后按顺序接收。
所以 JMSXGroupID 似乎对消息的处理方式没有任何影响。是否有可能在代理端配置不正确?
我确信消息不会自动确认,因为它们仍在 queue 中,直到消费者确认它们。
经过一些测试后,我发现对两个消费者进行分组是可行的。然而,对于 activemq.prefetchSize: 2
的一个消费者,它会立即收到来自同一组的两条消息。这是预期的行为吗?如果是,那么似乎如果有人想按顺序处理消息,他必须在订阅时将 activemq.prefetchSize
设置为 1
?
这是一段代码来测试(Node.js 12.x 并且需要包 @stomp/stompjs
和 websocket
):
Object.assign(global, { WebSocket: require('websocket').w3cwebsocket });
const { Client } = require('@stomp/stompjs');
function createClient() {
return new Client({
brokerURL: 'ws://localhost:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600',
});
}
function createLog(name) {
return (...str) => console.log.apply(console, [`${new Date().toISOString().split('T')[1]} <${name}>`, ...str]);
}
function createConsumer(name) {
const log = createLog(name);
const client = createClient();
client.onConnect = () => {
log('CLIENT_CONNECTED');
client.subscribe('/queue/test', (msg) => {
log('RECEIVED_MESSAGE');
setTimeout(() => {
msg.ack();
log('ACKED_MESSAGE');
}, 10000);
}, {
'activemq.prefetchSize': '2',
ack: 'client-individual',
});
}
client.activate();
}
function publishMessages() {
const log = createLog();
const client = createClient();
client.onConnect = () => {
client.publish({
destination: '/queue/test',
headers: {
persistent: true,
JMSXGroupID: 'grp',
},
body: 'test message 1',
});
client.publish({
destination: '/queue/test',
headers: {
persistent: true,
JMSXGroupID: 'grp',
},
body: 'test message 2',
});
};
client.activate();
}
createConsumer('A');
createConsumer('B');
setTimeout(() => {
publishMessages();
}, 2000);
输出:
22:50:03.196Z <B> CLIENT_CONNECTED
22:50:03.199Z <A> CLIENT_CONNECTED
22:50:05.195Z <B> RECEIVED_MESSAGE <-- ~same time
22:50:05.198Z <B> RECEIVED_MESSAGE <-- ~same time
22:50:15.196Z <B> ACKED_MESSAGE
22:50:15.198Z <B> ACKED_MESSAGE
这是STOMP客户端收到的headers(没有任何组信息):
headers: {
timestamp: '1580796287376',
persistent: 'true',
'message-id': 'ID:04f803c080ac-46709-1580756713304-3:66:-1:1:2',
priority: '4',
subscription: 'sub-0',
ack: 'ID:04f803c080ac-46709-1580756713304-70:2',
destination: '/queue/test',
expires: '0',
'content-length': '14'
},
这很奇怪,因为 JMSXGroupID 似乎不能保证组中的处理顺序。它只保证消息将以相同的顺序传递给同一消费者,但消息 B 可以在它确认消息 A 之前传递给消费者(甚至 "starts" 处理 A)(假设这里的消费者有 activemq.prefetchSize > 1
) .我不明白这一点,因为由于消费者使用 ack: client-individual
模式,这意味着在消费者发送 ACK 消息之前 不能 被视为由代理传递。那么为什么代理从它确实知道是否已传递来自同一组的先前消息的组发送消息呢?也许可以通过某种方式配置 ActiveMQ 代理来防止这样做?
其他解决方案是以某种方式接收 JMSXGroupID header 以及来自代理的消息,该消息不起作用(至少在 STOMP 中无效)- 在这种情况下,代理可能具有本地 mini-queue(s ) 以正确的顺序在本地对来自同一组的消息进行排序。
我的用例是我想要一个 queue 作为微服务 (/queue/my-service) 的输入通道。其他微服务将发送 messages/events/commands 给它。其中一些可能具有 JMSXGroupID,而另一些可能没有。我想要的是具有一定并发性的单一订阅(activemq.prefetchSize > 1
)。因此,如果收到 6 条消息(A1、A2、A3、B、C、D)"at once",其中三条来自同一组(A1、A3、A3),其他则在不同的组中或不在任何组中那么消费者应该并行处理:
- A1 -> A2 -> A3(顺序)
- B
- C
- D
我在此处的消息中报告了关于缺少 JMSXGroupID header 的错误(它看起来像@Justin Bertram 提到的错误):https://issues.apache.org/jira/browse/AMQ-7395
您看到了预期的行为。如果将 activemq.prefetchSize
设置为大于 1 的值,则代理将一次向客户端发送多于 1 条消息。由于 stompjs 会在收到消息时调用您传递给 subscribe
的回调函数,因此您必须自己控制确认顺序或简单地将 activemq.prefetchSize
设置为 1
.
解决您的一些具体问题...
...JMSXGroupID does not guarantee processing order in group. It only guarantees that messages will be delivered to same consumer in same order...
完全正确。使用 JMSXGroupID
header 对消息进行分组只能保证同一组中的消息将按照代理接收消息的顺序传递给特定的消费者。完成后,确认顺序由客户自己决定。
一般来说,假设消息处理并发来自多个并发消费者,而不是一个消费者(或一组消费者)本身并发处理消息。在这种一般情况下,将分组消息传递给单个消费者足以保证顺序,因为它将连续处理消息。
So why broker sends message from group for which it does know if previous message from same group was delivered?
代理从同一组发送多条消息,因为您通过将 activemq.prefetchSize
设置为大于 1 的值来请求它这样做。
我使用这个脚本在本地启动 ActiveMQ:
docker run -p 61616:61616 -p 61614:61614 -p 8161:8161 -it -v conf:/opt/activemq/conf -v data:/opt/activemq/data rmohr/activemq
它启动 AMQ 版本 5.15.6。我通过 websockets (v1.2) 通过 STOMP 连接 AMQ。 通过我的 "test" queue 中的 Web 控制台,我创建了两条消息,它们都属于同一组 "test_grp"。我启动了两个进程,每个进程都运行相同的逻辑:
- 连接
- 用
activemq.prefetchSize: 1
和ack: client-individual
订阅“/queue/test” headers - 在收到新消息时休眠 5 秒并发送消息确认
两个进程都立即接收消息,而第二条消息应该由同一进程接收,或者至少应该在第一个消息被确认后接收。
此外,如果我只启动一个 process/subscription 和 activemq.prefetchSize: 2
header,那么此进程会立即收到两条消息,而不是在第一条消息被确认后按顺序接收。
所以 JMSXGroupID 似乎对消息的处理方式没有任何影响。是否有可能在代理端配置不正确?
我确信消息不会自动确认,因为它们仍在 queue 中,直到消费者确认它们。
经过一些测试后,我发现对两个消费者进行分组是可行的。然而,对于 activemq.prefetchSize: 2
的一个消费者,它会立即收到来自同一组的两条消息。这是预期的行为吗?如果是,那么似乎如果有人想按顺序处理消息,他必须在订阅时将 activemq.prefetchSize
设置为 1
?
这是一段代码来测试(Node.js 12.x 并且需要包 @stomp/stompjs
和 websocket
):
Object.assign(global, { WebSocket: require('websocket').w3cwebsocket });
const { Client } = require('@stomp/stompjs');
function createClient() {
return new Client({
brokerURL: 'ws://localhost:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600',
});
}
function createLog(name) {
return (...str) => console.log.apply(console, [`${new Date().toISOString().split('T')[1]} <${name}>`, ...str]);
}
function createConsumer(name) {
const log = createLog(name);
const client = createClient();
client.onConnect = () => {
log('CLIENT_CONNECTED');
client.subscribe('/queue/test', (msg) => {
log('RECEIVED_MESSAGE');
setTimeout(() => {
msg.ack();
log('ACKED_MESSAGE');
}, 10000);
}, {
'activemq.prefetchSize': '2',
ack: 'client-individual',
});
}
client.activate();
}
function publishMessages() {
const log = createLog();
const client = createClient();
client.onConnect = () => {
client.publish({
destination: '/queue/test',
headers: {
persistent: true,
JMSXGroupID: 'grp',
},
body: 'test message 1',
});
client.publish({
destination: '/queue/test',
headers: {
persistent: true,
JMSXGroupID: 'grp',
},
body: 'test message 2',
});
};
client.activate();
}
createConsumer('A');
createConsumer('B');
setTimeout(() => {
publishMessages();
}, 2000);
输出:
22:50:03.196Z <B> CLIENT_CONNECTED
22:50:03.199Z <A> CLIENT_CONNECTED
22:50:05.195Z <B> RECEIVED_MESSAGE <-- ~same time
22:50:05.198Z <B> RECEIVED_MESSAGE <-- ~same time
22:50:15.196Z <B> ACKED_MESSAGE
22:50:15.198Z <B> ACKED_MESSAGE
这是STOMP客户端收到的headers(没有任何组信息):
headers: {
timestamp: '1580796287376',
persistent: 'true',
'message-id': 'ID:04f803c080ac-46709-1580756713304-3:66:-1:1:2',
priority: '4',
subscription: 'sub-0',
ack: 'ID:04f803c080ac-46709-1580756713304-70:2',
destination: '/queue/test',
expires: '0',
'content-length': '14'
},
这很奇怪,因为 JMSXGroupID 似乎不能保证组中的处理顺序。它只保证消息将以相同的顺序传递给同一消费者,但消息 B 可以在它确认消息 A 之前传递给消费者(甚至 "starts" 处理 A)(假设这里的消费者有 activemq.prefetchSize > 1
) .我不明白这一点,因为由于消费者使用 ack: client-individual
模式,这意味着在消费者发送 ACK 消息之前 不能 被视为由代理传递。那么为什么代理从它确实知道是否已传递来自同一组的先前消息的组发送消息呢?也许可以通过某种方式配置 ActiveMQ 代理来防止这样做?
其他解决方案是以某种方式接收 JMSXGroupID header 以及来自代理的消息,该消息不起作用(至少在 STOMP 中无效)- 在这种情况下,代理可能具有本地 mini-queue(s ) 以正确的顺序在本地对来自同一组的消息进行排序。
我的用例是我想要一个 queue 作为微服务 (/queue/my-service) 的输入通道。其他微服务将发送 messages/events/commands 给它。其中一些可能具有 JMSXGroupID,而另一些可能没有。我想要的是具有一定并发性的单一订阅(activemq.prefetchSize > 1
)。因此,如果收到 6 条消息(A1、A2、A3、B、C、D)"at once",其中三条来自同一组(A1、A3、A3),其他则在不同的组中或不在任何组中那么消费者应该并行处理:
- A1 -> A2 -> A3(顺序)
- B
- C
- D
我在此处的消息中报告了关于缺少 JMSXGroupID header 的错误(它看起来像@Justin Bertram 提到的错误):https://issues.apache.org/jira/browse/AMQ-7395
您看到了预期的行为。如果将 activemq.prefetchSize
设置为大于 1 的值,则代理将一次向客户端发送多于 1 条消息。由于 stompjs 会在收到消息时调用您传递给 subscribe
的回调函数,因此您必须自己控制确认顺序或简单地将 activemq.prefetchSize
设置为 1
.
解决您的一些具体问题...
...JMSXGroupID does not guarantee processing order in group. It only guarantees that messages will be delivered to same consumer in same order...
完全正确。使用 JMSXGroupID
header 对消息进行分组只能保证同一组中的消息将按照代理接收消息的顺序传递给特定的消费者。完成后,确认顺序由客户自己决定。
一般来说,假设消息处理并发来自多个并发消费者,而不是一个消费者(或一组消费者)本身并发处理消息。在这种一般情况下,将分组消息传递给单个消费者足以保证顺序,因为它将连续处理消息。
So why broker sends message from group for which it does know if previous message from same group was delivered?
代理从同一组发送多条消息,因为您通过将 activemq.prefetchSize
设置为大于 1 的值来请求它这样做。