如何:实现 BatchMessageListenerContainer 以批量使用 JMS 队列
How to: Implement a BatchMessageListenerContainer for bulk consuming a JMS queue
我最近在 Spring 集成中遇到了对 JMS 消费者的需求 - 能够消耗大量的突发数据,而不会对我的目标 Oracle 数据库造成太多提交压力。
DefaultMessageListenerContainer 似乎只支持逐条消息事务。
我在谷歌上搜索了解决方案并找到了几个 - 但其中很多都不是通过从 DMLC 继承而是通过克隆和修改相同的原始源代码来实现的 - 这使得它很容易被破坏以防我以后希望移动到更新版本的 spring-jms。此外,被克隆的代码引用了 DMLC 的私有属性,因此必须将其排除在外。为了使这一切正常工作,还需要几个接口和一个自定义消息监听器。总而言之,我感觉不舒服。
那么 - 怎么办?
嗯 - 这是一个简单紧凑的解决方案,完全基于从 DefaultMessageListenerContainer 派生的单个 class。
虽然我只测试了消息驱动通道适配器和 ChainedTransactionManager - 因为这是需要做这样的事情时的基本场景。
这是代码:
package dk.itealisten.myservice.spring.components;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import java.util.ArrayList;
import java.util.Enumeration;
public class BatchMessageListenerContainer extends DefaultMessageListenerContainer {
public static final int DEFAULT_BATCH_SIZE = 100;
public int batchSize = DEFAULT_BATCH_SIZE;
/**
* Override the method receiveMessage to return an instance of BatchMessage - an inner class being declared further down.
*/
@Override
protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
BatchMessage batch = new BatchMessage();
while (!batch.releaseAfterMessage(super.receiveMessage(consumer))) ;
return batch.messages.size() == 0 ? null : batch;
}
/**
* As BatchMessage implements the javax.jms.Message interface it fits perfectly into the DMLC - only caveat is that SimpleMessageConverter dont know how to convert it to a Spring Integration Message - but that can be helped.
* As BatchMessage will only serve as a container to carry the actual javax.jms.Message's from DMLC to the MessageListener it need not provide meaningful implementations of the methods of the interface as long as they are there.
*/
protected class BatchMessage implements Message {
public ArrayList<Message> messages = new ArrayList<Message>();
/**
* Add message to the collection of messages and return true if the batch meets the criteria for releasing it to the MessageListener.
*/
public boolean releaseAfterMessage(Message message) {
if (message != null) {
messages.add(message);
}
// Are we ready to release?
return message == null || messages.size() >= batchSize;
}
// Below is only dummy-implementations of the abstract methods of javax.jms.Message
@Override
public String getJMSMessageID() throws JMSException {
return null;
}
@Override
public void setJMSMessageID(String s) throws JMSException {
}
@Override
public long getJMSTimestamp() throws JMSException {
return 0;
}
@Override
public void setJMSTimestamp(long l) throws JMSException {
}
@Override
public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
return new byte[0];
}
@Override
public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException {
}
@Override
public void setJMSCorrelationID(String s) throws JMSException {
}
@Override
public String getJMSCorrelationID() throws JMSException {
return null;
}
@Override
public Destination getJMSReplyTo() throws JMSException {
return null;
}
@Override
public void setJMSReplyTo(Destination destination) throws JMSException {
}
@Override
public Destination getJMSDestination() throws JMSException {
return null;
}
@Override
public void setJMSDestination(Destination destination) throws JMSException {
}
@Override
public int getJMSDeliveryMode() throws JMSException {
return 0;
}
@Override
public void setJMSDeliveryMode(int i) throws JMSException {
}
@Override
public boolean getJMSRedelivered() throws JMSException {
return false;
}
@Override
public void setJMSRedelivered(boolean b) throws JMSException {
}
@Override
public String getJMSType() throws JMSException {
return null;
}
@Override
public void setJMSType(String s) throws JMSException {
}
@Override
public long getJMSExpiration() throws JMSException {
return 0;
}
@Override
public void setJMSExpiration(long l) throws JMSException {
}
@Override
public long getJMSDeliveryTime() throws JMSException {
return 0;
}
@Override
public void setJMSDeliveryTime(long l) throws JMSException {
}
@Override
public int getJMSPriority() throws JMSException {
return 0;
}
@Override
public void setJMSPriority(int i) throws JMSException {
}
@Override
public void clearProperties() throws JMSException {
}
@Override
public boolean propertyExists(String s) throws JMSException {
return false;
}
@Override
public boolean getBooleanProperty(String s) throws JMSException {
return false;
}
@Override
public byte getByteProperty(String s) throws JMSException {
return 0;
}
@Override
public short getShortProperty(String s) throws JMSException {
return 0;
}
@Override
public int getIntProperty(String s) throws JMSException {
return 0;
}
@Override
public long getLongProperty(String s) throws JMSException {
return 0;
}
@Override
public float getFloatProperty(String s) throws JMSException {
return 0;
}
@Override
public double getDoubleProperty(String s) throws JMSException {
return 0;
}
@Override
public String getStringProperty(String s) throws JMSException {
return null;
}
@Override
public Object getObjectProperty(String s) throws JMSException {
return null;
}
@Override
public Enumeration getPropertyNames() throws JMSException {
return null;
}
@Override
public void setBooleanProperty(String s, boolean b) throws JMSException {
}
@Override
public void setByteProperty(String s, byte b) throws JMSException {
}
@Override
public void setShortProperty(String s, short i) throws JMSException {
}
@Override
public void setIntProperty(String s, int i) throws JMSException {
}
@Override
public void setLongProperty(String s, long l) throws JMSException {
}
@Override
public void setFloatProperty(String s, float v) throws JMSException {
}
@Override
public void setDoubleProperty(String s, double v) throws JMSException {
}
@Override
public void setStringProperty(String s, String s1) throws JMSException {
}
@Override
public void setObjectProperty(String s, Object o) throws JMSException {
}
@Override
public void acknowledge() throws JMSException {
}
@Override
public void clearBody() throws JMSException {
}
@Override
public <T> T getBody(Class<T> aClass) throws JMSException {
return null;
}
@Override
public boolean isBodyAssignableTo(Class aClass) throws JMSException {
return false;
}
}
}
下面的示例展示了如何在 Spring 应用程序上下文中使用它:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-4.0.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.0.xsd">
<!-- Plug in the BatchMessageListenerContainer in a message-driven-channel-adapter -->
<jms:message-driven-channel-adapter container-class="dk.itealisten.myservice.spring.components.BatchMessageListenerContainer"
acknowledge="transacted"
channel="from.mq"
concurrent-consumers="5"
max-concurrent-consumers="15"
connection-factory="jmsConnectionFactory"
transaction-manager="transactionManager"
destination="my.mq.queue"
/>
<!-- Flow processing the BatchMessages being posted on the "from.mq" channel -->
<int:chain input-channel="from.mq" output-channel="nullChannel">
<int:splitter expression="payload.messages" />
<!-- This is where we deal with conversion to spring messages as the payload is now a single standard javax.jms.Message implementation -->
<int:transformer ref="smc" method="fromMessage"/>
<!-- And finally we persist -->
<int:service-activator ref="jdbcPublisher" method="persist"/>
</int:chain>
<!-- Various supporting beans -->
<!-- A bean to handle the database persistance -->
<bean id="jdbcPersistor" class="dk.itealisten.myservice.spring.components.JdbcPersistor" p:dataSource-ref="dataSource" />
<!-- A bean to handle the conversion that could not take place in the MessageListener as it don't know how to convert a BatchMessage -->
<bean id="smc" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
<!-- Transaction manager must make sure messages are committed outbound (JDBC) before cleaned up inbound (JMS). -->
<bean id="transactionManager" class="org.springframework.data.transaction.ChainedTransactionManager">
<constructor-arg name="transactionManagers">
<list>
<bean class="org.springframework.jms.connection.JmsTransactionManager" p:connectionFactory-ref="jmsConnectionFactory" />
<bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager" p:dataSource-ref="dataSource" />
</list>
</constructor-arg>
</bean>
我最近在 Spring 集成中遇到了对 JMS 消费者的需求 - 能够消耗大量的突发数据,而不会对我的目标 Oracle 数据库造成太多提交压力。
DefaultMessageListenerContainer 似乎只支持逐条消息事务。
我在谷歌上搜索了解决方案并找到了几个 - 但其中很多都不是通过从 DMLC 继承而是通过克隆和修改相同的原始源代码来实现的 - 这使得它很容易被破坏以防我以后希望移动到更新版本的 spring-jms。此外,被克隆的代码引用了 DMLC 的私有属性,因此必须将其排除在外。为了使这一切正常工作,还需要几个接口和一个自定义消息监听器。总而言之,我感觉不舒服。
那么 - 怎么办?
嗯 - 这是一个简单紧凑的解决方案,完全基于从 DefaultMessageListenerContainer 派生的单个 class。
虽然我只测试了消息驱动通道适配器和 ChainedTransactionManager - 因为这是需要做这样的事情时的基本场景。
这是代码:
package dk.itealisten.myservice.spring.components;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import java.util.ArrayList;
import java.util.Enumeration;
public class BatchMessageListenerContainer extends DefaultMessageListenerContainer {
public static final int DEFAULT_BATCH_SIZE = 100;
public int batchSize = DEFAULT_BATCH_SIZE;
/**
* Override the method receiveMessage to return an instance of BatchMessage - an inner class being declared further down.
*/
@Override
protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
BatchMessage batch = new BatchMessage();
while (!batch.releaseAfterMessage(super.receiveMessage(consumer))) ;
return batch.messages.size() == 0 ? null : batch;
}
/**
* As BatchMessage implements the javax.jms.Message interface it fits perfectly into the DMLC - only caveat is that SimpleMessageConverter dont know how to convert it to a Spring Integration Message - but that can be helped.
* As BatchMessage will only serve as a container to carry the actual javax.jms.Message's from DMLC to the MessageListener it need not provide meaningful implementations of the methods of the interface as long as they are there.
*/
protected class BatchMessage implements Message {
public ArrayList<Message> messages = new ArrayList<Message>();
/**
* Add message to the collection of messages and return true if the batch meets the criteria for releasing it to the MessageListener.
*/
public boolean releaseAfterMessage(Message message) {
if (message != null) {
messages.add(message);
}
// Are we ready to release?
return message == null || messages.size() >= batchSize;
}
// Below is only dummy-implementations of the abstract methods of javax.jms.Message
@Override
public String getJMSMessageID() throws JMSException {
return null;
}
@Override
public void setJMSMessageID(String s) throws JMSException {
}
@Override
public long getJMSTimestamp() throws JMSException {
return 0;
}
@Override
public void setJMSTimestamp(long l) throws JMSException {
}
@Override
public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
return new byte[0];
}
@Override
public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException {
}
@Override
public void setJMSCorrelationID(String s) throws JMSException {
}
@Override
public String getJMSCorrelationID() throws JMSException {
return null;
}
@Override
public Destination getJMSReplyTo() throws JMSException {
return null;
}
@Override
public void setJMSReplyTo(Destination destination) throws JMSException {
}
@Override
public Destination getJMSDestination() throws JMSException {
return null;
}
@Override
public void setJMSDestination(Destination destination) throws JMSException {
}
@Override
public int getJMSDeliveryMode() throws JMSException {
return 0;
}
@Override
public void setJMSDeliveryMode(int i) throws JMSException {
}
@Override
public boolean getJMSRedelivered() throws JMSException {
return false;
}
@Override
public void setJMSRedelivered(boolean b) throws JMSException {
}
@Override
public String getJMSType() throws JMSException {
return null;
}
@Override
public void setJMSType(String s) throws JMSException {
}
@Override
public long getJMSExpiration() throws JMSException {
return 0;
}
@Override
public void setJMSExpiration(long l) throws JMSException {
}
@Override
public long getJMSDeliveryTime() throws JMSException {
return 0;
}
@Override
public void setJMSDeliveryTime(long l) throws JMSException {
}
@Override
public int getJMSPriority() throws JMSException {
return 0;
}
@Override
public void setJMSPriority(int i) throws JMSException {
}
@Override
public void clearProperties() throws JMSException {
}
@Override
public boolean propertyExists(String s) throws JMSException {
return false;
}
@Override
public boolean getBooleanProperty(String s) throws JMSException {
return false;
}
@Override
public byte getByteProperty(String s) throws JMSException {
return 0;
}
@Override
public short getShortProperty(String s) throws JMSException {
return 0;
}
@Override
public int getIntProperty(String s) throws JMSException {
return 0;
}
@Override
public long getLongProperty(String s) throws JMSException {
return 0;
}
@Override
public float getFloatProperty(String s) throws JMSException {
return 0;
}
@Override
public double getDoubleProperty(String s) throws JMSException {
return 0;
}
@Override
public String getStringProperty(String s) throws JMSException {
return null;
}
@Override
public Object getObjectProperty(String s) throws JMSException {
return null;
}
@Override
public Enumeration getPropertyNames() throws JMSException {
return null;
}
@Override
public void setBooleanProperty(String s, boolean b) throws JMSException {
}
@Override
public void setByteProperty(String s, byte b) throws JMSException {
}
@Override
public void setShortProperty(String s, short i) throws JMSException {
}
@Override
public void setIntProperty(String s, int i) throws JMSException {
}
@Override
public void setLongProperty(String s, long l) throws JMSException {
}
@Override
public void setFloatProperty(String s, float v) throws JMSException {
}
@Override
public void setDoubleProperty(String s, double v) throws JMSException {
}
@Override
public void setStringProperty(String s, String s1) throws JMSException {
}
@Override
public void setObjectProperty(String s, Object o) throws JMSException {
}
@Override
public void acknowledge() throws JMSException {
}
@Override
public void clearBody() throws JMSException {
}
@Override
public <T> T getBody(Class<T> aClass) throws JMSException {
return null;
}
@Override
public boolean isBodyAssignableTo(Class aClass) throws JMSException {
return false;
}
}
}
下面的示例展示了如何在 Spring 应用程序上下文中使用它:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-4.0.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.0.xsd">
<!-- Plug in the BatchMessageListenerContainer in a message-driven-channel-adapter -->
<jms:message-driven-channel-adapter container-class="dk.itealisten.myservice.spring.components.BatchMessageListenerContainer"
acknowledge="transacted"
channel="from.mq"
concurrent-consumers="5"
max-concurrent-consumers="15"
connection-factory="jmsConnectionFactory"
transaction-manager="transactionManager"
destination="my.mq.queue"
/>
<!-- Flow processing the BatchMessages being posted on the "from.mq" channel -->
<int:chain input-channel="from.mq" output-channel="nullChannel">
<int:splitter expression="payload.messages" />
<!-- This is where we deal with conversion to spring messages as the payload is now a single standard javax.jms.Message implementation -->
<int:transformer ref="smc" method="fromMessage"/>
<!-- And finally we persist -->
<int:service-activator ref="jdbcPublisher" method="persist"/>
</int:chain>
<!-- Various supporting beans -->
<!-- A bean to handle the database persistance -->
<bean id="jdbcPersistor" class="dk.itealisten.myservice.spring.components.JdbcPersistor" p:dataSource-ref="dataSource" />
<!-- A bean to handle the conversion that could not take place in the MessageListener as it don't know how to convert a BatchMessage -->
<bean id="smc" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
<!-- Transaction manager must make sure messages are committed outbound (JDBC) before cleaned up inbound (JMS). -->
<bean id="transactionManager" class="org.springframework.data.transaction.ChainedTransactionManager">
<constructor-arg name="transactionManagers">
<list>
<bean class="org.springframework.jms.connection.JmsTransactionManager" p:connectionFactory-ref="jmsConnectionFactory" />
<bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager" p:dataSource-ref="dataSource" />
</list>
</constructor-arg>
</bean>