消费者在 Wildfly AS 上导致内存泄漏
Consumer causing memory leak on Wildfly AS
我有一个 producer
consumer
设置,使用 Wildfly
作为 JMS
,生产者每 1 分钟使用一个 newFixedThreadPool(126)
每个线程拉动从 REST
服务下载数据,并将其推送到 Wildfly
AS 上的 HornetQ
。
然后在消费者方面,我有一个消费 HornetQ
中的消息的消费者 class 和一个简单的 Parser
class,用于数据库插入,我实例化我的 onMessage()
中类型 Parser
的对象,然后将消息传递给它,消息在 JSON
中,我的解析器 class 循环遍历它获取值并插入它们进入我的数据库。
消费者:
public void Consume(Consumer asyncReceiver) throws Throwable {
try {
/** Get the initial context */
final Properties props = new Properties();
/** If debugging in IDE the properties are acceded this way */
if(debug){
InputStream f = getClass().getClassLoader().getResourceAsStream("consumer.properties");
props.load(f);
}
/** If running the .jar artifact the properties are acceded this way*/
else{
File jarPath = new File(getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
String propertiesPath = jarPath.getParentFile().getAbsolutePath();
props.load(new FileInputStream(propertiesPath + File.separator + "consumer.properties"));
}
/** These few lines should be removed and setup in the properties file*/
props.put(Context.INITIAL_CONTEXT_FACTORY, props.getProperty("INITIAL_CONTEXT_FACTORY"));
props.put(Context.PROVIDER_URL, props.getProperty("PROVIDER_URL"));
props.put(Context.SECURITY_PRINCIPAL, props.getProperty("DEFAULT_USERNAME"));
props.put(Context.SECURITY_CREDENTIALS, props.getProperty("DEFAULT_PASSWORD"));
context = new InitialContext(props);
/** Lookup the queue object */
Queue queue = (Queue) context.lookup(props.getProperty("DEFAULT_DESTINATION"));
/** Lookup the queue connection factory */
ConnectionFactory connFactory = (ConnectionFactory) context.lookup(props.getProperty("DEFAULT_CONNECTION_FACTORY"));
/** Create a queue connection */
connection = connFactory.createConnection(props.getProperty("DEFAULT_USERNAME"), props.getProperty("DEFAULT_PASSWORD"));
/** Create a queue session */
queueSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** Create a queue consumer */
msgConsumer = queueSession.createConsumer(queue);
/** Set an asynchronous message listener */
msgConsumer.setMessageListener(asyncReceiver);
/** Set an asynchronous exception listener on the connection */
connection.setExceptionListener(asyncReceiver);
/** Start connection */
connection.start();
/** Wait for messages */
System.out.println("waiting for messages");
for (int i = 0; i < 47483647; i++) {
Thread.sleep(1000);
System.out.print(".");
}
System.out.println();
} catch (Exception e) {
log.severe(e.getMessage());
throw e;
}finally {
if (context != null) {
context.close();
}
if (queueSession != null)
{ queueSession.close();
}
if(msgConsumer != null){
msgConsumer.close();
}
if (connection != null) {
connection.close();
}
}
}
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
Parser parser = new Parser();
parser.parseApplication(msg.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
解析器:
public void parseApplication(String NRData) throws Exception {
DBConnection db = DBConnection.createApplication();
db.getConnection();
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
JsonNode rootNode = mapper.readTree(NRData);
Iterator<Map.Entry<String, JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String, JsonNode> field = fieldsIterator.next();
String envName = field.getKey();
JsonNode appValue = field.getValue();
JSONArray jsonArray = new JSONArray(appValue.toString());
String appName = jsonArray.getString(0);
String appID = jsonArray.getString(1);
JSONObject json = jsonArray.getJSONObject(2);
JSONObject metricsData = json.getJSONObject("metric_data");
JSONArray metrics = metricsData.getJSONArray("metrics");
JSONObject array1 = metrics.getJSONObject(0);
JSONArray timeslices = array1.getJSONArray("timeslices");
for (int i = 0; i < timeslices.length(); i++) {
JSONObject array2 = timeslices.getJSONObject(i);
JSONObject values = array2.getJSONObject("values");
// Instant from = array2.getString("from");
Instant from = TimestampUtils.parseTimestamp(array2.get("from").toString(), null);
Instant to = TimestampUtils.parseTimestamp(array2.get("to").toString(), null);
Iterator<String> nameItr = values.keys();
while (nameItr.hasNext()) {
String name = nameItr.next();
System.out.println(
"\nEnv name: " + envName +
"\nApp name: " + appName +
"\nApp ID: " + appID +
"\nRequest per minute: " + values.getDouble(name) +
"\nFrom: " + from + " To: " + to);
ThroughputEntry TP = new ThroughputEntry();
TP.setThroughput(values.getDouble(name));
TP.setEnvironment(envName);
TP.setName(appName);
TP.setRetrieved(from);
TP.setPeriodEnd(to);
db.addHistory(TP);
}
}
}
}
数据库连接:
public class DBConnection {
private final String table;
/**
* Set the table name for applications
*/
public static DBConnection createApplication() {
return new DBConnection("APPLICATIONDATA");
}
public DBConnection(String table) {
this.table = String.format("NRDATA.%s", table);
}
public Connection getConnection() throws IllegalAccessException,
InstantiationException, ClassNotFoundException, SQLException {
try {
Class.forName("COM.ibm.db2os390.sqlj.jdbc.DB2SQLJDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
System.out.println("Connecting to database...");
Connection connection = DriverManager.getConnection(DB2url, user, password);
System.out.println("From DAO, connection obtained ");
return connection;
}
public boolean addHistory(ThroughputEntry entry) throws Exception {
try (Connection connection = getConnection()) {
Statement statement = connection.createStatement();
return 0 < statement
.executeUpdate(String
.format("INSERT INTO " + table
+ "(ID, RETRIEVED, PERIOD_END, ENVIRONMENT, APPNAME, THROUGHPUT)"
+ "VALUES ('%s', '%s', '%s', '%s', '%s', %s)",
entry.getUUID(), entry.getRetrieved(),
entry.getPeriodEnd(), entry.getEnvironment(),
entry.getName(), entry.getThroughput()));
}
}
}
所以我的问题是 Wildfly
AS 出现内存泄漏,我认为问题可能出在我的消费者身上。
所以有几个问题:
我是否应该在将 onMessage()
方法中收到的消息缓冲到消费者上,然后再将它们插入数据库?
如果我收到太多消息,这是否会导致泄漏?消费者是否向 Wildfly
AS 发送任何类型的 og Ack
?
我有消费者 运行 无限期地循环,也许这是错误的也许它应该休眠或等待。
我已经花了 2 天时间尝试解决这个问题,非常感谢任何帮助。
你应该关闭任何需要关闭的东西。我正在查看前几行,我已经看到两个未关闭的流。
检查您的代码,任何实现 AutoCloseable
的代码都需要正确关闭。使用 try-with-resources
来做到这一点。
IDE 已经可以为您指出可能的资源泄漏,例如在 Java Compiler>Errors/Warnings
.
中的 Eclipse 调整警告中
编辑:您编辑了您的问题,现在可以看到明显的漏洞。在 parseApplication
方法中有语句 db.getConnection()
。该方法创建了一个您永远不会使用的连接并且您永远不会关闭它..
我有一个 producer
consumer
设置,使用 Wildfly
作为 JMS
,生产者每 1 分钟使用一个 newFixedThreadPool(126)
每个线程拉动从 REST
服务下载数据,并将其推送到 Wildfly
AS 上的 HornetQ
。
然后在消费者方面,我有一个消费 HornetQ
中的消息的消费者 class 和一个简单的 Parser
class,用于数据库插入,我实例化我的 onMessage()
中类型 Parser
的对象,然后将消息传递给它,消息在 JSON
中,我的解析器 class 循环遍历它获取值并插入它们进入我的数据库。
消费者:
public void Consume(Consumer asyncReceiver) throws Throwable {
try {
/** Get the initial context */
final Properties props = new Properties();
/** If debugging in IDE the properties are acceded this way */
if(debug){
InputStream f = getClass().getClassLoader().getResourceAsStream("consumer.properties");
props.load(f);
}
/** If running the .jar artifact the properties are acceded this way*/
else{
File jarPath = new File(getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
String propertiesPath = jarPath.getParentFile().getAbsolutePath();
props.load(new FileInputStream(propertiesPath + File.separator + "consumer.properties"));
}
/** These few lines should be removed and setup in the properties file*/
props.put(Context.INITIAL_CONTEXT_FACTORY, props.getProperty("INITIAL_CONTEXT_FACTORY"));
props.put(Context.PROVIDER_URL, props.getProperty("PROVIDER_URL"));
props.put(Context.SECURITY_PRINCIPAL, props.getProperty("DEFAULT_USERNAME"));
props.put(Context.SECURITY_CREDENTIALS, props.getProperty("DEFAULT_PASSWORD"));
context = new InitialContext(props);
/** Lookup the queue object */
Queue queue = (Queue) context.lookup(props.getProperty("DEFAULT_DESTINATION"));
/** Lookup the queue connection factory */
ConnectionFactory connFactory = (ConnectionFactory) context.lookup(props.getProperty("DEFAULT_CONNECTION_FACTORY"));
/** Create a queue connection */
connection = connFactory.createConnection(props.getProperty("DEFAULT_USERNAME"), props.getProperty("DEFAULT_PASSWORD"));
/** Create a queue session */
queueSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/** Create a queue consumer */
msgConsumer = queueSession.createConsumer(queue);
/** Set an asynchronous message listener */
msgConsumer.setMessageListener(asyncReceiver);
/** Set an asynchronous exception listener on the connection */
connection.setExceptionListener(asyncReceiver);
/** Start connection */
connection.start();
/** Wait for messages */
System.out.println("waiting for messages");
for (int i = 0; i < 47483647; i++) {
Thread.sleep(1000);
System.out.print(".");
}
System.out.println();
} catch (Exception e) {
log.severe(e.getMessage());
throw e;
}finally {
if (context != null) {
context.close();
}
if (queueSession != null)
{ queueSession.close();
}
if(msgConsumer != null){
msgConsumer.close();
}
if (connection != null) {
connection.close();
}
}
}
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
Parser parser = new Parser();
parser.parseApplication(msg.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
解析器:
public void parseApplication(String NRData) throws Exception {
DBConnection db = DBConnection.createApplication();
db.getConnection();
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
JsonNode rootNode = mapper.readTree(NRData);
Iterator<Map.Entry<String, JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String, JsonNode> field = fieldsIterator.next();
String envName = field.getKey();
JsonNode appValue = field.getValue();
JSONArray jsonArray = new JSONArray(appValue.toString());
String appName = jsonArray.getString(0);
String appID = jsonArray.getString(1);
JSONObject json = jsonArray.getJSONObject(2);
JSONObject metricsData = json.getJSONObject("metric_data");
JSONArray metrics = metricsData.getJSONArray("metrics");
JSONObject array1 = metrics.getJSONObject(0);
JSONArray timeslices = array1.getJSONArray("timeslices");
for (int i = 0; i < timeslices.length(); i++) {
JSONObject array2 = timeslices.getJSONObject(i);
JSONObject values = array2.getJSONObject("values");
// Instant from = array2.getString("from");
Instant from = TimestampUtils.parseTimestamp(array2.get("from").toString(), null);
Instant to = TimestampUtils.parseTimestamp(array2.get("to").toString(), null);
Iterator<String> nameItr = values.keys();
while (nameItr.hasNext()) {
String name = nameItr.next();
System.out.println(
"\nEnv name: " + envName +
"\nApp name: " + appName +
"\nApp ID: " + appID +
"\nRequest per minute: " + values.getDouble(name) +
"\nFrom: " + from + " To: " + to);
ThroughputEntry TP = new ThroughputEntry();
TP.setThroughput(values.getDouble(name));
TP.setEnvironment(envName);
TP.setName(appName);
TP.setRetrieved(from);
TP.setPeriodEnd(to);
db.addHistory(TP);
}
}
}
}
数据库连接:
public class DBConnection {
private final String table;
/**
* Set the table name for applications
*/
public static DBConnection createApplication() {
return new DBConnection("APPLICATIONDATA");
}
public DBConnection(String table) {
this.table = String.format("NRDATA.%s", table);
}
public Connection getConnection() throws IllegalAccessException,
InstantiationException, ClassNotFoundException, SQLException {
try {
Class.forName("COM.ibm.db2os390.sqlj.jdbc.DB2SQLJDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
System.out.println("Connecting to database...");
Connection connection = DriverManager.getConnection(DB2url, user, password);
System.out.println("From DAO, connection obtained ");
return connection;
}
public boolean addHistory(ThroughputEntry entry) throws Exception {
try (Connection connection = getConnection()) {
Statement statement = connection.createStatement();
return 0 < statement
.executeUpdate(String
.format("INSERT INTO " + table
+ "(ID, RETRIEVED, PERIOD_END, ENVIRONMENT, APPNAME, THROUGHPUT)"
+ "VALUES ('%s', '%s', '%s', '%s', '%s', %s)",
entry.getUUID(), entry.getRetrieved(),
entry.getPeriodEnd(), entry.getEnvironment(),
entry.getName(), entry.getThroughput()));
}
}
}
所以我的问题是 Wildfly
AS 出现内存泄漏,我认为问题可能出在我的消费者身上。
所以有几个问题:
我是否应该在将 onMessage()
方法中收到的消息缓冲到消费者上,然后再将它们插入数据库?
如果我收到太多消息,这是否会导致泄漏?消费者是否向 Wildfly
AS 发送任何类型的 og Ack
?
我有消费者 运行 无限期地循环,也许这是错误的也许它应该休眠或等待。
我已经花了 2 天时间尝试解决这个问题,非常感谢任何帮助。
你应该关闭任何需要关闭的东西。我正在查看前几行,我已经看到两个未关闭的流。
检查您的代码,任何实现 AutoCloseable
的代码都需要正确关闭。使用 try-with-resources
来做到这一点。
IDE 已经可以为您指出可能的资源泄漏,例如在 Java Compiler>Errors/Warnings
.
编辑:您编辑了您的问题,现在可以看到明显的漏洞。在 parseApplication
方法中有语句 db.getConnection()
。该方法创建了一个您永远不会使用的连接并且您永远不会关闭它..