JMS 生产者和消费者花费太多时间。为什么?
JMS Producer and Consumer taking too much time. Why?
我使用 ActiveMQ 作为代理和 JMS 来接收异步。一旦消息进入队列,队列中的消息就会开始使用这些消息。
为此,我有书面的生产者和消费者代码。
一切正常,但整个过程对于 10000 条记录花费了大约 2-3 分钟的时间。(我使用了一个循环只是为了模拟)
以下是完整代码:
这是 JMS 生产者:
public class JmsMessageProducer
{
public void jmsListener(String obj) throws Exception
{
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:61616)"));
broker.start();
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try
{
long millis = System.currentTimeMillis() % 1000;
// Producer
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = connectionFactory.createConnection();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("customerQueue3");
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new ConsumerMessageListener("Consumer3"));
connection.start();
producer = session.createProducer(queue);
for(int i=0; i<10000; i++)
{
String payload = "Important Task"+i;
Message msg = session.createTextMessage(payload);
System.out.println("Sending text '" + payload + "'");
producer.send(msg);
}
long millis2 = System.currentTimeMillis() % 1000;
long millis3 = millis2- millis;
System.out.println("time taken: "+ millis3 );
}
finally
{
if(producer != null)
{
producer.close();
}
if (session != null)
{
session.close();
}
if (connection != null)
{
connection.close();
}
broker.stop();
}
}
}
这是监听器代码:
public class ConsumerMessageListener implements MessageListener
{
private String consumerName;
public ConsumerMessageListener(String consumerName)
{
this.consumerName = consumerName;
}
DummyAdapter adapter = new DummyAdapter();
public void onMessage(Message message)
{
TextMessage textMessage = (TextMessage) message;
try
{
System.out.println(consumerName + " received "+ textMessage.getText());
// adapter.dummy(textMessage.getText());
}
catch (JMSException e)
{
e.printStackTrace();
}
}
}
我是第一次这样做。
谁能告诉我为什么这个过程要花太多时间?
我究竟做错了什么?
我不确定您期望的 SLA,但您的消息代理以 80 messages/sec(大约)的速率工作,这还不错。
您的代码的一个问题是 session.createProducer(queue)
是时间滞后的问题,因为这是一项代价高昂的操作(需要时间),您可以使用单个生产者对象来生成多条消息。
所以在for循环外创建MessageProducer
如下所示:
MessageProducer producer = session.createProducer(queue);
for(int i=0; i<10000; i++) {
String payload = "Important Task"+i;
Message msg = session.createTextMessage(payload);
System.out.println("Sending text '" + payload + "'");
producer.send(msg);
}
您必须 close
producer
和 session
对象都在 finally
块中。
P.S.: 另外,作为旁注,如果您将制作人 class
的名字命名为 JmsMessagePrducer
而不是通常的 JmsMessageListener
会更好我们仅对 JMS
消费者使用名称 Listener
。
更新:
I just want to know that is it good to process around 80
messages/sec.? that too without any operation after consuming. What if
add more tasks after message consumption from queue, like inserting in
db or some business operation.
在不知道哪个 Server/OS/etc 的情况下简单地说 80 messages/sec 更好或 50 messages/seconds 更好是不明智的...(需要考虑许多参数) .当谈到性能要求时,您首先需要 specify/define 要求。
如果您当前的代码处理大约 80 个 messages/sec,那么对于给定的条件,这就是您的应用程序(测试程序)基准。因此,如果您觉得它不能满足您的性能要求,那么您需要配置多个 JMS 侦听器来并行处理消息并更改您的设计。
我使用 ActiveMQ 作为代理和 JMS 来接收异步。一旦消息进入队列,队列中的消息就会开始使用这些消息。 为此,我有书面的生产者和消费者代码。 一切正常,但整个过程对于 10000 条记录花费了大约 2-3 分钟的时间。(我使用了一个循环只是为了模拟) 以下是完整代码:
这是 JMS 生产者:
public class JmsMessageProducer
{
public void jmsListener(String obj) throws Exception
{
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:61616)"));
broker.start();
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try
{
long millis = System.currentTimeMillis() % 1000;
// Producer
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = connectionFactory.createConnection();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("customerQueue3");
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new ConsumerMessageListener("Consumer3"));
connection.start();
producer = session.createProducer(queue);
for(int i=0; i<10000; i++)
{
String payload = "Important Task"+i;
Message msg = session.createTextMessage(payload);
System.out.println("Sending text '" + payload + "'");
producer.send(msg);
}
long millis2 = System.currentTimeMillis() % 1000;
long millis3 = millis2- millis;
System.out.println("time taken: "+ millis3 );
}
finally
{
if(producer != null)
{
producer.close();
}
if (session != null)
{
session.close();
}
if (connection != null)
{
connection.close();
}
broker.stop();
}
}
}
这是监听器代码:
public class ConsumerMessageListener implements MessageListener
{
private String consumerName;
public ConsumerMessageListener(String consumerName)
{
this.consumerName = consumerName;
}
DummyAdapter adapter = new DummyAdapter();
public void onMessage(Message message)
{
TextMessage textMessage = (TextMessage) message;
try
{
System.out.println(consumerName + " received "+ textMessage.getText());
// adapter.dummy(textMessage.getText());
}
catch (JMSException e)
{
e.printStackTrace();
}
}
}
我是第一次这样做。 谁能告诉我为什么这个过程要花太多时间? 我究竟做错了什么?
我不确定您期望的 SLA,但您的消息代理以 80 messages/sec(大约)的速率工作,这还不错。
您的代码的一个问题是 session.createProducer(queue)
是时间滞后的问题,因为这是一项代价高昂的操作(需要时间),您可以使用单个生产者对象来生成多条消息。
所以在for循环外创建MessageProducer
如下所示:
MessageProducer producer = session.createProducer(queue);
for(int i=0; i<10000; i++) {
String payload = "Important Task"+i;
Message msg = session.createTextMessage(payload);
System.out.println("Sending text '" + payload + "'");
producer.send(msg);
}
您必须 close
producer
和 session
对象都在 finally
块中。
P.S.: 另外,作为旁注,如果您将制作人 class
的名字命名为 JmsMessagePrducer
而不是通常的 JmsMessageListener
会更好我们仅对 JMS
消费者使用名称 Listener
。
更新:
I just want to know that is it good to process around 80 messages/sec.? that too without any operation after consuming. What if add more tasks after message consumption from queue, like inserting in db or some business operation.
在不知道哪个 Server/OS/etc 的情况下简单地说 80 messages/sec 更好或 50 messages/seconds 更好是不明智的...(需要考虑许多参数) .当谈到性能要求时,您首先需要 specify/define 要求。
如果您当前的代码处理大约 80 个 messages/sec,那么对于给定的条件,这就是您的应用程序(测试程序)基准。因此,如果您觉得它不能满足您的性能要求,那么您需要配置多个 JMS 侦听器来并行处理消息并更改您的设计。