无法在nodejs中使用STOMP协议使用ActiveMQ优先级消息

Unable to use ActiveMQ priority messages using STOMP protocol in nodejs

我有一个将消息发送到队列的应用程序,以及另一个订阅队列并处理它的应用程序。我希望 OTP 消息的优先级高于其他消息,因此我尝试使用 ActiveMQ 消息优先级来实现这一点。

这是使用 stompit 库在 nodejs 中使用 STOMP 协议的 ActiveMQ 连接代码:

const serverPrimary = {
  host: keys.activeMQ.host,
  port: keys.activeMQ.port,
  ssl: ssl,
  connectHeaders: {
    host: '/',
    login: keys.activeMQ.username,
    passcode: keys.activeMQ.password,
    'heart-beat': '5000,5000',
  },
}
connManager = new stompit.ConnectFailover(
  [serverPrimary, serverFailover],
  reconnectOptions,
)
connManager.on('error', function (e) {
  const connectArgs = e.connectArgs
  const address = connectArgs.host + ':' + connectArgs.port
  logger.error({ error: e, customMessage: address })
})

    channelPool = new stompit.ChannelPool(connManager)

发送消息代码

const pushMessageToAMQ = (queue, message) => {
      const queues = Object.values(activeMQ.queues)
      if (!queues.includes(queue)) {
        _mqLog(mqLogMessages.unknownQueue + queue)
        return
      }
      //Priority header is set
      const header = {
        destination: queue,
        priority: 7
      }
      //If message is not a string
      if (typeof message !== 'string') message = JSON.stringify(message)
      //Logging message before sending
      _mqLog(
        mqLogMessages.sending,
        { service: services.amq },
        { header: header, message: message },
      )
      //Sending message to amq
      _sendMessageToAMQ(header, message, error => {
        if (error) {
          _mqError(error, mqLogMessages.sendingError, { service: services.amq })
        }
      })
    }

const _sendMessageToAMQ = (headers, body, callback) => {
      channelPool.channel((error, channel) => {
        if (error) {
          callback(error)
          return
        }
        channel.send(headers, body, callback)
      })
    }

这是在第二个应用程序中订阅队列的代码:

const amqSubscribe = (queue, callback, ack = 'client-individual') => {
  log({ customMessage: 'Subscribing to ' + queue })
  const queues = Object.values(activeMQ.queues)
  if (!queues.includes(queue)) {
    return
  }
  channelPool.channel((error, channel) => {
    let header = {
      destination: queue,
      ack: ack,
      'activemq.prefetchSize': 1,
    }
    //Check for error
    if (error) {
      _mqError(error, mqLogMessages.baseError, header)
    } else {
      channel.subscribe(
        header,
        _synchronisedHandler((error, message, next) => {
          //Check for error
          if (error) {
            _mqError(error, mqLogMessages.subscriptionError, header)
            next()
          } else {
            //Read message
            message.readString('utf-8', function (error, body) {
              if (error) {
                _mqError(error, mqLogMessages.readError, header)
                next()
              } else {
                //Message read successfully call callback
                callback(body, () => {
                  //Acknowledgment callback
                  channel.ack(message)
                  next()
                })
              }
            })
          }
        }),
      )
    }
  })
}

Activemq.xml

<policyEntries>
            <policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" />
.......

我尝试推送具有不同优先级的不同消息,并在所有消息被推送到队列后打开第二个应用程序(即订阅消息的应用程序)。但是,消息的执行顺序与发送的顺序相同。优先级没有任何改变。有什么我想念的吗? 我是否必须在消费者端添加一些东西才能工作?

ActiveMQ“Classic”(由 Amazon MQ 使用)默认禁用对优先级的支持。正如 documentation 所述:

...support [for message priority] is disabled by default so it needs to be be enabled using per destination policies through xml configuration...

您需要在 policyEntry 中为您的队列设置 prioritizedMessages="true",例如:

 <destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">" prioritizedMessages="true"/>
      ...

要清楚,这是在 activemq.xml 中的 broker(即不是客户端)上配置的,它适用于 every 那种客户。