尝试释放连接以避免最大客户端连接错误
Trying to release conncection to avoid maximum client connection error
我正在尝试在不将默认连接大小从 1000 增加到 2000 或更多的情况下找出针对以下错误消息的解决方案。
最近,当大约 1000 条消息被发送到代理时,我遇到了以下错误,延迟了 5 分钟,如下面的代码所示。
WARN | Could not accept connection : Exceeded the maximum number of allowed client connections. See the 'maximumConnections' property on the TCP transport configuration URI in the ActiveMQ configuration file (e.g., activemq.xml) | org.apache.activemq.broker.TransportConnector | ActiveMQ Transport Server Thread Handler: tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
下面是连续监听ActiveMQ的代码,一看到COMPLETE
,生成file.Otherwise后,就发邮件给用户,进入else
阻止并再次向代理发送消息。
在 else 块内,我想通过在发送完消息后关闭连接来进行测试。所以我关闭了 finally 块内的连接,如下所示。这是正确的处理方式吗?
@Component
public class DownloadConsumer {
@Autowired
private JavaMailSender javaMailSender;
// one instance, reuse
private final CloseableHttpClient httpClient = HttpClients.createDefault();
Connection connection;
// Working Code with JMS 2.0
@JmsListener(destination = "MessageProducerJMSV1")
public void processBrokerQueues(String message) throws DaoException, JMSException {
try {
RequestDao requestDao = (RequestDao) context.getBean("requestDao");
String receivedStatus = requestDao.getRequestStatus(message);
//Retrieve Username from the message to include in an email
String[] parts = message.split("#");
String userName = parts[1].trim();
//Retrieve personnelID from the message to include in the webservice calls
String personnelID = parts[3].trim();
//Before sending this message, do the check for COMPLETE or ERROR etc
if(receivedStatus.equals("COMPLETE")) {
String latestUUID = requestDao.getUUID();
logger.info("Received UUID in Controller is as follows! ");
logger.info(latestUUID);
requestDao.sendMessage(message,latestUUID);
logger.info("Received status is COMPLETE! ");
logger.info("Sending email to the user! ");
String emailMessage = "Dear "+userName+",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
String recipientEmail = userName+"@organization.com";
/*****************************************************\
// START: EMAIL Related Code
*******************************************************/
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg, true);
helper.setFrom("ABCResearch@organization.com");
helper.setTo(recipientEmail);
helper.setSubject("Requested Files !");
helper.setText(emailMessage,true);
javaMailSender.send(msg);
}
else {
// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// Destination represents here our queue 'MessageProducerJMSV1' on the JMS server
Destination destination = session.createQueue(subject);
MessageProducer producer = session.createProducer(destination);
//Sending message to the queue
TextMessage toSendMessage = session.createTextMessage(message);
long delay = 300 * 1000;
long period =300 * 1000;
toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
producer.send(toSendMessage);
}
}
catch(Throwable th){
th.printStackTrace();
}
finally {
connection.close();
}
}
// URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static String subject = "MessageProducerJMSV1"; //Queue Name
// default broker URL is : tcp://localhost:61616"
private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
}
问题在于下面的代码,它会在每条消息中打开新的连接 - 您应该理想地调用它一次(如果连接过期则再次调用)。
// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
从中获取会话并关闭会话。根据使用情况,您甚至可以延长会话时间。
您收到“超出允许的最大客户端连接数”的原因是您正在创建连接而不是关闭它们。换句话说,您的应用程序正在“泄漏”连接。为了修复泄漏,您需要关闭连接。在 finally
块中关闭 JMS 连接是普遍接受的做法,因此您的代码在那里看起来不错。但是,您需要检查 null
以防在实际创建 connection
之前出现问题,例如:
finally {
if (connection != null) {
connection.close();
}
}
也就是说,值得注意的是,创建和关闭 JMS 连接和会话以及发送 单个 消息的生产者是 well-known anti-pattern。如果您缓存连接(例如,在 static
变量中)并 re-used 它会更好。例如:
@Component
public class DownloadConsumer {
@Autowired
private JavaMailSender javaMailSender;
// one instance, reuse
private final CloseableHttpClient httpClient = HttpClients.createDefault();
private static Connection connection;
private static Object connectionLock = new Object();
// URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static String subject = "MessageProducerJMSV1"; //Queue Name
// default broker URL is : tcp://localhost:61616"
private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
// Working Code with JMS 2.0
@JmsListener(destination = "MessageProducerJMSV1")
public void processBrokerQueues(String message) throws DaoException, JMSException {
try {
RequestDao requestDao = (RequestDao) context.getBean("requestDao");
String receivedStatus = requestDao.getRequestStatus(message);
//Retrieve Username from the message to include in an email
String[] parts = message.split("#");
String userName = parts[1].trim();
//Retrieve personnelID from the message to include in the webservice calls
String personnelID = parts[3].trim();
//Before sending this message, do the check for COMPLETE or ERROR etc
if (receivedStatus.equals("COMPLETE")) {
String latestUUID = requestDao.getUUID();
logger.info("Received UUID in Controller is as follows! ");
logger.info(latestUUID);
requestDao.sendMessage(message, latestUUID);
logger.info("Received status is COMPLETE! ");
logger.info("Sending email to the user! ");
String emailMessage = "Dear " + userName + ",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
String recipientEmail = userName + "@organization.com";
/*****************************************************\
// START: EMAIL Related Code
*******************************************************/
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg, true);
helper.setFrom("ABCResearch@organization.com");
helper.setTo(recipientEmail);
helper.setSubject("Requested Files !");
helper.setText(emailMessage, true);
javaMailSender.send(msg);
} else {
try {
createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Destination represents here our queue 'MessageProducerJMSV1' on the JMS server
Destination destination = session.createQueue(subject);
MessageProducer producer = session.createProducer(destination);
//Sending message to the queue
TextMessage toSendMessage = session.createTextMessage(message);
long delay = 300 * 1000;
long period = 300 * 1000;
toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
producer.send(toSendMessage);
} catch (Throwable th) {
th.printStackTrace();
synchronized (connectionLock) {
// if there are any problems close the connection and it will be re-created next time
if (connection != null) {
connection.close();
connection = null;
}
}
}
}
} catch (Throwable th) {
th.printStackTrace();
}
}
private void createConnection() {
synchronized (connectionLock) {
if (connection == null) {
// Getting JMS connection from the server
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
}
}
}
}
您会注意到,在此代码中没有 finally
块来关闭连接。这是故意的,因为这段代码的全部要点是保持连接打开,这样它就不会打开和关闭连接来发送单个消息。调用之间的连接 re-used。连接关闭的唯一时间是捕获 Throwable
时。
此外,请记住,如果只是 发送 消息,则没有理由在 JMS 连接上调用 start()
。 start()
方法仅影响 消费者。
我正在尝试在不将默认连接大小从 1000 增加到 2000 或更多的情况下找出针对以下错误消息的解决方案。
最近,当大约 1000 条消息被发送到代理时,我遇到了以下错误,延迟了 5 分钟,如下面的代码所示。
WARN | Could not accept connection : Exceeded the maximum number of allowed client connections. See the 'maximumConnections' property on the TCP transport configuration URI in the ActiveMQ configuration file (e.g., activemq.xml) | org.apache.activemq.broker.TransportConnector | ActiveMQ Transport Server Thread Handler: tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
下面是连续监听ActiveMQ的代码,一看到COMPLETE
,生成file.Otherwise后,就发邮件给用户,进入else
阻止并再次向代理发送消息。
在 else 块内,我想通过在发送完消息后关闭连接来进行测试。所以我关闭了 finally 块内的连接,如下所示。这是正确的处理方式吗?
@Component
public class DownloadConsumer {
@Autowired
private JavaMailSender javaMailSender;
// one instance, reuse
private final CloseableHttpClient httpClient = HttpClients.createDefault();
Connection connection;
// Working Code with JMS 2.0
@JmsListener(destination = "MessageProducerJMSV1")
public void processBrokerQueues(String message) throws DaoException, JMSException {
try {
RequestDao requestDao = (RequestDao) context.getBean("requestDao");
String receivedStatus = requestDao.getRequestStatus(message);
//Retrieve Username from the message to include in an email
String[] parts = message.split("#");
String userName = parts[1].trim();
//Retrieve personnelID from the message to include in the webservice calls
String personnelID = parts[3].trim();
//Before sending this message, do the check for COMPLETE or ERROR etc
if(receivedStatus.equals("COMPLETE")) {
String latestUUID = requestDao.getUUID();
logger.info("Received UUID in Controller is as follows! ");
logger.info(latestUUID);
requestDao.sendMessage(message,latestUUID);
logger.info("Received status is COMPLETE! ");
logger.info("Sending email to the user! ");
String emailMessage = "Dear "+userName+",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
String recipientEmail = userName+"@organization.com";
/*****************************************************\
// START: EMAIL Related Code
*******************************************************/
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg, true);
helper.setFrom("ABCResearch@organization.com");
helper.setTo(recipientEmail);
helper.setSubject("Requested Files !");
helper.setText(emailMessage,true);
javaMailSender.send(msg);
}
else {
// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// Destination represents here our queue 'MessageProducerJMSV1' on the JMS server
Destination destination = session.createQueue(subject);
MessageProducer producer = session.createProducer(destination);
//Sending message to the queue
TextMessage toSendMessage = session.createTextMessage(message);
long delay = 300 * 1000;
long period =300 * 1000;
toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
producer.send(toSendMessage);
}
}
catch(Throwable th){
th.printStackTrace();
}
finally {
connection.close();
}
}
// URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static String subject = "MessageProducerJMSV1"; //Queue Name
// default broker URL is : tcp://localhost:61616"
private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
}
问题在于下面的代码,它会在每条消息中打开新的连接 - 您应该理想地调用它一次(如果连接过期则再次调用)。
// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
从中获取会话并关闭会话。根据使用情况,您甚至可以延长会话时间。
您收到“超出允许的最大客户端连接数”的原因是您正在创建连接而不是关闭它们。换句话说,您的应用程序正在“泄漏”连接。为了修复泄漏,您需要关闭连接。在 finally
块中关闭 JMS 连接是普遍接受的做法,因此您的代码在那里看起来不错。但是,您需要检查 null
以防在实际创建 connection
之前出现问题,例如:
finally {
if (connection != null) {
connection.close();
}
}
也就是说,值得注意的是,创建和关闭 JMS 连接和会话以及发送 单个 消息的生产者是 well-known anti-pattern。如果您缓存连接(例如,在 static
变量中)并 re-used 它会更好。例如:
@Component
public class DownloadConsumer {
@Autowired
private JavaMailSender javaMailSender;
// one instance, reuse
private final CloseableHttpClient httpClient = HttpClients.createDefault();
private static Connection connection;
private static Object connectionLock = new Object();
// URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static String subject = "MessageProducerJMSV1"; //Queue Name
// default broker URL is : tcp://localhost:61616"
private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
// Working Code with JMS 2.0
@JmsListener(destination = "MessageProducerJMSV1")
public void processBrokerQueues(String message) throws DaoException, JMSException {
try {
RequestDao requestDao = (RequestDao) context.getBean("requestDao");
String receivedStatus = requestDao.getRequestStatus(message);
//Retrieve Username from the message to include in an email
String[] parts = message.split("#");
String userName = parts[1].trim();
//Retrieve personnelID from the message to include in the webservice calls
String personnelID = parts[3].trim();
//Before sending this message, do the check for COMPLETE or ERROR etc
if (receivedStatus.equals("COMPLETE")) {
String latestUUID = requestDao.getUUID();
logger.info("Received UUID in Controller is as follows! ");
logger.info(latestUUID);
requestDao.sendMessage(message, latestUUID);
logger.info("Received status is COMPLETE! ");
logger.info("Sending email to the user! ");
String emailMessage = "Dear " + userName + ",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
String recipientEmail = userName + "@organization.com";
/*****************************************************\
// START: EMAIL Related Code
*******************************************************/
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg, true);
helper.setFrom("ABCResearch@organization.com");
helper.setTo(recipientEmail);
helper.setSubject("Requested Files !");
helper.setText(emailMessage, true);
javaMailSender.send(msg);
} else {
try {
createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Destination represents here our queue 'MessageProducerJMSV1' on the JMS server
Destination destination = session.createQueue(subject);
MessageProducer producer = session.createProducer(destination);
//Sending message to the queue
TextMessage toSendMessage = session.createTextMessage(message);
long delay = 300 * 1000;
long period = 300 * 1000;
toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
producer.send(toSendMessage);
} catch (Throwable th) {
th.printStackTrace();
synchronized (connectionLock) {
// if there are any problems close the connection and it will be re-created next time
if (connection != null) {
connection.close();
connection = null;
}
}
}
}
} catch (Throwable th) {
th.printStackTrace();
}
}
private void createConnection() {
synchronized (connectionLock) {
if (connection == null) {
// Getting JMS connection from the server
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
}
}
}
}
您会注意到,在此代码中没有 finally
块来关闭连接。这是故意的,因为这段代码的全部要点是保持连接打开,这样它就不会打开和关闭连接来发送单个消息。调用之间的连接 re-used。连接关闭的唯一时间是捕获 Throwable
时。
此外,请记住,如果只是 发送 消息,则没有理由在 JMS 连接上调用 start()
。 start()
方法仅影响 消费者。