使用 JMS API 访问 IBM MQ 的 BOTHRESH 值
Access BOTHRESH value of IBM MQ using JMS API
我正在使用 spring 引导和 mq-jms-spring-boot-starter
创建一个 JMS 侦听器应用程序,它从队列中读取消息,处理它并将消息转发到另一个队列。
如果出现有害消息情况,我正在尝试生成警报。但是,为了不对同一消息生成多个警报,我正在考虑将 JMSXDeliveryCount
与 BOTHRESH
值进行比较,并在发送到 BOQ 之前的最后一次重新投递中生成警报。
BOTHRESH
和BOQNAME
为源队列配置。
@JmsListener(destination = "${sourceQueue}")
public void processMessages(Message message) {
TextMessage msg = (TextMessage) message;
int boThresh;
int redeliveryCount;
try {
boThresh = message.getIntProperty("<WHAT COMES HERE>");
redeliveryCount = message.getIntProperty("JMSXDeliveryCount");
String processedMessage = this.processMessage(message);
this.forwardMessage("destinationQueue", processedMessage);
} catch (Exception e) {
if (redeliveryCount >= boThresh) {
//generate alert here
}
}
}
这里应该如何获取BOTHRESH
的值呢?有可能吗?我尝试使用 getPropertyNames()
方法获取所有可用属性,以下是我看到的所有属性。
JMS_IBM_Format
JMS_IBM_PutDate
JMS_IBM_Character_Set
JMSXDeliveryCount
JMS_IBM_MsgType
JMSXUserID
JMS_IBM_Encoding
JMS_IBM_PutTime
JMSXAppID
JMS_IBM_PutApplType
这听起来像是在混合使用可重试和不可重试的错误处理。
如果您正在跟踪重新投递并需要发送警报,那么您可能不想设置 BOTHRESH 值,而是在您的客户端代码中管理它。
推荐的消费者错误处理模式:
如果消息无效(即.. 错误 JSON 或 XML)立即移至 DLQ。邮件质量永远不会提高,没有理由重复重试。
如果处理中的 'next step' 出现故障(即数据库),则拒绝投递并允许重新投递延迟和退出重试。这也有利于其他消费者尝试处理消息的队列,并消除了一个消费者无法阻止消息的问题。
另外,请考虑使用客户端消费者代码进行监控和警报可能会出现问题,因为它结合了不同的功能。如果您的目标是跟踪无效消息,监视 DLQ 通常是更好的设计模式,它会从您的消费者代码中删除 'monitoring' 代码。
这样就可以了,但是代码确实需要对管理通道的管理员访问权限,这对于客户端应用程序来说可能不是最佳选择。
配置
import com.ibm.mq.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.ibm.mq.constants.CMQC;
import java.util.Hashtable;
@Configuration
public class MQConfiguration {
protected final Log logger = LogFactory.getLog(getClass());
@Value("${ibm.mq.queueManager:QM1}")
public String qMgrName;
@Value("${app.mq.admin.channel:DEV.ADMIN.SVRCONN}")
private String adminChannel;
@Value("${app.mq.host:localhost}")
private String host;
@Value("${app.mq.host.port:1414}")
private int port;
@Value("${app.mq.adminuser:admin}")
private String adminUser;
@Value("${app.mq.adminpassword:passw0rd}")
private String password;
@Bean
public MQQueueManager mqQueueManager() {
try {
Hashtable<String,Object> connectionProperties = new Hashtable<String,Object>();
connectionProperties.put(CMQC.CHANNEL_PROPERTY, adminChannel);
connectionProperties.put(CMQC.HOST_NAME_PROPERTY, host);
connectionProperties.put(CMQC.PORT_PROPERTY, port);
connectionProperties.put(CMQC.USER_ID_PROPERTY, adminUser);
connectionProperties.put(CMQC.PASSWORD_PROPERTY, password);
return new MQQueueManager(qMgrName, connectionProperties);
} catch (MQException e) {
logger.warn("MQException obtaining MQQueueManager");
logger.warn(e.getMessage());
}
return null;
}
}
获取队列的退出阈值
import com.ibm.mq.*;
import com.ibm.mq.constants.CMQC;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class Runner {
protected final Log logger = LogFactory.getLog(getClass());
@Value("${app.mq.queue:DEV.QUEUE.1}")
private String queueName = "";
private final MQQueueManager mqQueueManager;
Runner(MQQueueManager mqQueueManager) {
this.mqQueueManager = mqQueueManager;
}
@Bean
CommandLineRunner init() {
return (args) -> {
logger.info("Determining Backout threshold");
try {
int[] selectors = {
CMQC.MQIA_BACKOUT_THRESHOLD,
CMQC.MQCA_BACKOUT_REQ_Q_NAME };
int[] intAttrs = new int[1];
byte[] charAttrs = new byte[MQC.MQ_Q_NAME_LENGTH];
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_INQUIRE | MQC.MQOO_SAVE_ALL_CONTEXT;
MQQueue myQueue = mqQueueManager.accessQueue(queueName, openOptions, null, null, null);
logger.info("Queue Obtained");
MQManagedObject moMyQueue = (MQManagedObject) myQueue;
moMyQueue.inquire(selectors, intAttrs, charAttrs);
int boThresh = intAttrs[0];
String backoutQname = new String(charAttrs);
logger.info("Backout Threshold: " + boThresh);
logger.info("Backout Queue: " + backoutQname);
} catch (MQException e) {
logger.warn("MQException Error obtaining threshold");
logger.warn(e.getMessage());
}
};
}
}
我正在使用 spring 引导和 mq-jms-spring-boot-starter
创建一个 JMS 侦听器应用程序,它从队列中读取消息,处理它并将消息转发到另一个队列。
如果出现有害消息情况,我正在尝试生成警报。但是,为了不对同一消息生成多个警报,我正在考虑将 JMSXDeliveryCount
与 BOTHRESH
值进行比较,并在发送到 BOQ 之前的最后一次重新投递中生成警报。
BOTHRESH
和BOQNAME
为源队列配置。
@JmsListener(destination = "${sourceQueue}")
public void processMessages(Message message) {
TextMessage msg = (TextMessage) message;
int boThresh;
int redeliveryCount;
try {
boThresh = message.getIntProperty("<WHAT COMES HERE>");
redeliveryCount = message.getIntProperty("JMSXDeliveryCount");
String processedMessage = this.processMessage(message);
this.forwardMessage("destinationQueue", processedMessage);
} catch (Exception e) {
if (redeliveryCount >= boThresh) {
//generate alert here
}
}
}
这里应该如何获取BOTHRESH
的值呢?有可能吗?我尝试使用 getPropertyNames()
方法获取所有可用属性,以下是我看到的所有属性。
JMS_IBM_Format
JMS_IBM_PutDate
JMS_IBM_Character_Set
JMSXDeliveryCount
JMS_IBM_MsgType
JMSXUserID
JMS_IBM_Encoding
JMS_IBM_PutTime
JMSXAppID
JMS_IBM_PutApplType
这听起来像是在混合使用可重试和不可重试的错误处理。 如果您正在跟踪重新投递并需要发送警报,那么您可能不想设置 BOTHRESH 值,而是在您的客户端代码中管理它。
推荐的消费者错误处理模式:
如果消息无效(即.. 错误 JSON 或 XML)立即移至 DLQ。邮件质量永远不会提高,没有理由重复重试。
如果处理中的 'next step' 出现故障(即数据库),则拒绝投递并允许重新投递延迟和退出重试。这也有利于其他消费者尝试处理消息的队列,并消除了一个消费者无法阻止消息的问题。
另外,请考虑使用客户端消费者代码进行监控和警报可能会出现问题,因为它结合了不同的功能。如果您的目标是跟踪无效消息,监视 DLQ 通常是更好的设计模式,它会从您的消费者代码中删除 'monitoring' 代码。
这样就可以了,但是代码确实需要对管理通道的管理员访问权限,这对于客户端应用程序来说可能不是最佳选择。
配置
import com.ibm.mq.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.ibm.mq.constants.CMQC;
import java.util.Hashtable;
@Configuration
public class MQConfiguration {
protected final Log logger = LogFactory.getLog(getClass());
@Value("${ibm.mq.queueManager:QM1}")
public String qMgrName;
@Value("${app.mq.admin.channel:DEV.ADMIN.SVRCONN}")
private String adminChannel;
@Value("${app.mq.host:localhost}")
private String host;
@Value("${app.mq.host.port:1414}")
private int port;
@Value("${app.mq.adminuser:admin}")
private String adminUser;
@Value("${app.mq.adminpassword:passw0rd}")
private String password;
@Bean
public MQQueueManager mqQueueManager() {
try {
Hashtable<String,Object> connectionProperties = new Hashtable<String,Object>();
connectionProperties.put(CMQC.CHANNEL_PROPERTY, adminChannel);
connectionProperties.put(CMQC.HOST_NAME_PROPERTY, host);
connectionProperties.put(CMQC.PORT_PROPERTY, port);
connectionProperties.put(CMQC.USER_ID_PROPERTY, adminUser);
connectionProperties.put(CMQC.PASSWORD_PROPERTY, password);
return new MQQueueManager(qMgrName, connectionProperties);
} catch (MQException e) {
logger.warn("MQException obtaining MQQueueManager");
logger.warn(e.getMessage());
}
return null;
}
}
获取队列的退出阈值
import com.ibm.mq.*;
import com.ibm.mq.constants.CMQC;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class Runner {
protected final Log logger = LogFactory.getLog(getClass());
@Value("${app.mq.queue:DEV.QUEUE.1}")
private String queueName = "";
private final MQQueueManager mqQueueManager;
Runner(MQQueueManager mqQueueManager) {
this.mqQueueManager = mqQueueManager;
}
@Bean
CommandLineRunner init() {
return (args) -> {
logger.info("Determining Backout threshold");
try {
int[] selectors = {
CMQC.MQIA_BACKOUT_THRESHOLD,
CMQC.MQCA_BACKOUT_REQ_Q_NAME };
int[] intAttrs = new int[1];
byte[] charAttrs = new byte[MQC.MQ_Q_NAME_LENGTH];
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_INQUIRE | MQC.MQOO_SAVE_ALL_CONTEXT;
MQQueue myQueue = mqQueueManager.accessQueue(queueName, openOptions, null, null, null);
logger.info("Queue Obtained");
MQManagedObject moMyQueue = (MQManagedObject) myQueue;
moMyQueue.inquire(selectors, intAttrs, charAttrs);
int boThresh = intAttrs[0];
String backoutQname = new String(charAttrs);
logger.info("Backout Threshold: " + boThresh);
logger.info("Backout Queue: " + backoutQname);
} catch (MQException e) {
logger.warn("MQException Error obtaining threshold");
logger.warn(e.getMessage());
}
};
}
}