无法在nodejs中使用STOMP协议使用ActiveMQ优先级消息
Unable to use ActiveMQ priority messages using STOMP protocol in nodejs
我有一个将消息发送到队列的应用程序,以及另一个订阅队列并处理它的应用程序。我希望 OTP 消息的优先级高于其他消息,因此我尝试使用 ActiveMQ 消息优先级来实现这一点。
这是使用 stompit 库在 nodejs 中使用 STOMP 协议的 ActiveMQ 连接代码:
const serverPrimary = {
host: keys.activeMQ.host,
port: keys.activeMQ.port,
ssl: ssl,
connectHeaders: {
host: '/',
login: keys.activeMQ.username,
passcode: keys.activeMQ.password,
'heart-beat': '5000,5000',
},
}
connManager = new stompit.ConnectFailover(
[serverPrimary, serverFailover],
reconnectOptions,
)
connManager.on('error', function (e) {
const connectArgs = e.connectArgs
const address = connectArgs.host + ':' + connectArgs.port
logger.error({ error: e, customMessage: address })
})
channelPool = new stompit.ChannelPool(connManager)
发送消息代码
const pushMessageToAMQ = (queue, message) => {
const queues = Object.values(activeMQ.queues)
if (!queues.includes(queue)) {
_mqLog(mqLogMessages.unknownQueue + queue)
return
}
//Priority header is set
const header = {
destination: queue,
priority: 7
}
//If message is not a string
if (typeof message !== 'string') message = JSON.stringify(message)
//Logging message before sending
_mqLog(
mqLogMessages.sending,
{ service: services.amq },
{ header: header, message: message },
)
//Sending message to amq
_sendMessageToAMQ(header, message, error => {
if (error) {
_mqError(error, mqLogMessages.sendingError, { service: services.amq })
}
})
}
const _sendMessageToAMQ = (headers, body, callback) => {
channelPool.channel((error, channel) => {
if (error) {
callback(error)
return
}
channel.send(headers, body, callback)
})
}
这是在第二个应用程序中订阅队列的代码:
const amqSubscribe = (queue, callback, ack = 'client-individual') => {
log({ customMessage: 'Subscribing to ' + queue })
const queues = Object.values(activeMQ.queues)
if (!queues.includes(queue)) {
return
}
channelPool.channel((error, channel) => {
let header = {
destination: queue,
ack: ack,
'activemq.prefetchSize': 1,
}
//Check for error
if (error) {
_mqError(error, mqLogMessages.baseError, header)
} else {
channel.subscribe(
header,
_synchronisedHandler((error, message, next) => {
//Check for error
if (error) {
_mqError(error, mqLogMessages.subscriptionError, header)
next()
} else {
//Read message
message.readString('utf-8', function (error, body) {
if (error) {
_mqError(error, mqLogMessages.readError, header)
next()
} else {
//Message read successfully call callback
callback(body, () => {
//Acknowledgment callback
channel.ack(message)
next()
})
}
})
}
}),
)
}
})
}
Activemq.xml
<policyEntries>
<policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" />
.......
我尝试推送具有不同优先级的不同消息,并在所有消息被推送到队列后打开第二个应用程序(即订阅消息的应用程序)。但是,消息的执行顺序与发送的顺序相同。优先级没有任何改变。有什么我想念的吗?
我是否必须在消费者端添加一些东西才能工作?
ActiveMQ“Classic”(由 Amazon MQ 使用)默认禁用对优先级的支持。正如 documentation 所述:
...support [for message priority] is disabled by default so it needs to be be enabled using per destination policies through xml configuration...
您需要在 policyEntry
中为您的队列设置 prioritizedMessages="true"
,例如:
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" prioritizedMessages="true"/>
...
要清楚,这是在 activemq.xml
中的 broker(即不是客户端)上配置的,它适用于 every 那种客户。
我有一个将消息发送到队列的应用程序,以及另一个订阅队列并处理它的应用程序。我希望 OTP 消息的优先级高于其他消息,因此我尝试使用 ActiveMQ 消息优先级来实现这一点。
这是使用 stompit 库在 nodejs 中使用 STOMP 协议的 ActiveMQ 连接代码:
const serverPrimary = {
host: keys.activeMQ.host,
port: keys.activeMQ.port,
ssl: ssl,
connectHeaders: {
host: '/',
login: keys.activeMQ.username,
passcode: keys.activeMQ.password,
'heart-beat': '5000,5000',
},
}
connManager = new stompit.ConnectFailover(
[serverPrimary, serverFailover],
reconnectOptions,
)
connManager.on('error', function (e) {
const connectArgs = e.connectArgs
const address = connectArgs.host + ':' + connectArgs.port
logger.error({ error: e, customMessage: address })
})
channelPool = new stompit.ChannelPool(connManager)
发送消息代码
const pushMessageToAMQ = (queue, message) => {
const queues = Object.values(activeMQ.queues)
if (!queues.includes(queue)) {
_mqLog(mqLogMessages.unknownQueue + queue)
return
}
//Priority header is set
const header = {
destination: queue,
priority: 7
}
//If message is not a string
if (typeof message !== 'string') message = JSON.stringify(message)
//Logging message before sending
_mqLog(
mqLogMessages.sending,
{ service: services.amq },
{ header: header, message: message },
)
//Sending message to amq
_sendMessageToAMQ(header, message, error => {
if (error) {
_mqError(error, mqLogMessages.sendingError, { service: services.amq })
}
})
}
const _sendMessageToAMQ = (headers, body, callback) => {
channelPool.channel((error, channel) => {
if (error) {
callback(error)
return
}
channel.send(headers, body, callback)
})
}
这是在第二个应用程序中订阅队列的代码:
const amqSubscribe = (queue, callback, ack = 'client-individual') => {
log({ customMessage: 'Subscribing to ' + queue })
const queues = Object.values(activeMQ.queues)
if (!queues.includes(queue)) {
return
}
channelPool.channel((error, channel) => {
let header = {
destination: queue,
ack: ack,
'activemq.prefetchSize': 1,
}
//Check for error
if (error) {
_mqError(error, mqLogMessages.baseError, header)
} else {
channel.subscribe(
header,
_synchronisedHandler((error, message, next) => {
//Check for error
if (error) {
_mqError(error, mqLogMessages.subscriptionError, header)
next()
} else {
//Read message
message.readString('utf-8', function (error, body) {
if (error) {
_mqError(error, mqLogMessages.readError, header)
next()
} else {
//Message read successfully call callback
callback(body, () => {
//Acknowledgment callback
channel.ack(message)
next()
})
}
})
}
}),
)
}
})
}
Activemq.xml
<policyEntries>
<policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" />
.......
我尝试推送具有不同优先级的不同消息,并在所有消息被推送到队列后打开第二个应用程序(即订阅消息的应用程序)。但是,消息的执行顺序与发送的顺序相同。优先级没有任何改变。有什么我想念的吗? 我是否必须在消费者端添加一些东西才能工作?
ActiveMQ“Classic”(由 Amazon MQ 使用)默认禁用对优先级的支持。正如 documentation 所述:
...support [for message priority] is disabled by default so it needs to be be enabled using per destination policies through xml configuration...
您需要在 policyEntry
中为您的队列设置 prioritizedMessages="true"
,例如:
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" prioritizedMessages="true"/>
...
要清楚,这是在 activemq.xml
中的 broker(即不是客户端)上配置的,它适用于 every 那种客户。