WSO2 ESB + activeMQ
WSO2 ESB + activeMQ
我是 wso2 esb 和 jms 的新手。我将一些消息从 soapUI 发送到 wso2 esb。在我的 wso 序列中,已处理的消息会发送到 jms。是否有可能从 wso2 esb 设置此消息的 "time to live"?或者其他一些方式?
我在 AMQ 中添加了这个:
<policyEntry queue="myQueue">
<deadLetterStrategy>
<individualDeadLetterStrategy
queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
有点像
<property name="JMSExpiration" value="today+hour_long_value" scope="transport" type="STRING"></property>
按顺序没有影响。
我发现唯一可行的方法是创建自己的调解器,它将设置消息的生存时间并将其发送到队列。队列名是按顺序预设的,然后调用mediator:
<property xmlns="http://ws.apache.org/ns/synapse" name="qname" value="your_queue_name" scope="default" type="STRING"></property>
<class xmlns="http://ws.apache.org/ns/synapse" name="com.example.JMSMessageTimeToLiveMediator"></class>
调解员class:
public class JMSMessageTimeToLiveMediator extends AbstractMediator implements
ManagedLifecycle {
private static String CON_FACTORY_NAME = "QueueConnectionFactory";
private static String DEF_PROP_QNAME = "qname";
private static long TIME_TO_LIVE = 60000;
private static QueueConnectionFactory cf;
public boolean mediate(MessageContext context) {
Connection connection = null;
Session session = null;
try {
connection = cf.createQueueConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = (String) context.getProperty(DEF_PROP_QNAME);
Destination destination = session.createQueue(queueName);
MessageProducer producer = session.createProducer(destination);
producer.setTimeToLive(TIME_TO_LIVE);
TextMessage message = session.createTextMessage(context
.getEnvelope().toString());
producer.send(message);
} catch (JMSException e) {
log.error("ProduceJMS ERROR: " + e.getClass() + " "
+ e.getMessage());
} catch (Exception e) {
log.error("ProduceJMS ERROR: " + e.getClass() + " "
+ e.getMessage());
} finally {
try {
session.close();
connection.close();
} catch (JMSException e) {
log.error("ProduceJMS ERROR: " + e.getMessage());
}
}
return true;
}
public void init(SynapseEnvironment emvironment) {
Hashtable<String, Object> environment = new Hashtable<String, Object>();
environment.put("java.naming.factory.initial",
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
log.debug("ProduceJMS INIT");
try {
InitialContext ic = new InitialContext(environment);
cf = (QueueConnectionFactory) ic.lookup(CON_FACTORY_NAME);
} catch (NamingException e) {
log.error("ProduceJMS INIT ERROR: " + e.getMessage());
}
}
public void destroy() {
}
}
如果您使用的是 JMS 消息存储,则只需设置 属性 JMS_PROD_TIME_TO_LIVE。
<property name="JMS_PROD_TIME_TO_LIVE" value="15000" />
<store messageStore="my_jms_message_store" />
使用 WSO2 ESB 4.9.0(使用突触版本 2.1.3-wso2v11)进行测试
您可以在 JmsProducer code
中找到更多信息
您可以使用相同的方式设置消息优先级 (属性 JMS_PROD_PRIORITY)。
我是 wso2 esb 和 jms 的新手。我将一些消息从 soapUI 发送到 wso2 esb。在我的 wso 序列中,已处理的消息会发送到 jms。是否有可能从 wso2 esb 设置此消息的 "time to live"?或者其他一些方式?
我在 AMQ 中添加了这个:
<policyEntry queue="myQueue">
<deadLetterStrategy>
<individualDeadLetterStrategy
queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
有点像
<property name="JMSExpiration" value="today+hour_long_value" scope="transport" type="STRING"></property>
按顺序没有影响。
我发现唯一可行的方法是创建自己的调解器,它将设置消息的生存时间并将其发送到队列。队列名是按顺序预设的,然后调用mediator:
<property xmlns="http://ws.apache.org/ns/synapse" name="qname" value="your_queue_name" scope="default" type="STRING"></property>
<class xmlns="http://ws.apache.org/ns/synapse" name="com.example.JMSMessageTimeToLiveMediator"></class>
调解员class:
public class JMSMessageTimeToLiveMediator extends AbstractMediator implements
ManagedLifecycle {
private static String CON_FACTORY_NAME = "QueueConnectionFactory";
private static String DEF_PROP_QNAME = "qname";
private static long TIME_TO_LIVE = 60000;
private static QueueConnectionFactory cf;
public boolean mediate(MessageContext context) {
Connection connection = null;
Session session = null;
try {
connection = cf.createQueueConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = (String) context.getProperty(DEF_PROP_QNAME);
Destination destination = session.createQueue(queueName);
MessageProducer producer = session.createProducer(destination);
producer.setTimeToLive(TIME_TO_LIVE);
TextMessage message = session.createTextMessage(context
.getEnvelope().toString());
producer.send(message);
} catch (JMSException e) {
log.error("ProduceJMS ERROR: " + e.getClass() + " "
+ e.getMessage());
} catch (Exception e) {
log.error("ProduceJMS ERROR: " + e.getClass() + " "
+ e.getMessage());
} finally {
try {
session.close();
connection.close();
} catch (JMSException e) {
log.error("ProduceJMS ERROR: " + e.getMessage());
}
}
return true;
}
public void init(SynapseEnvironment emvironment) {
Hashtable<String, Object> environment = new Hashtable<String, Object>();
environment.put("java.naming.factory.initial",
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
log.debug("ProduceJMS INIT");
try {
InitialContext ic = new InitialContext(environment);
cf = (QueueConnectionFactory) ic.lookup(CON_FACTORY_NAME);
} catch (NamingException e) {
log.error("ProduceJMS INIT ERROR: " + e.getMessage());
}
}
public void destroy() {
}
}
如果您使用的是 JMS 消息存储,则只需设置 属性 JMS_PROD_TIME_TO_LIVE。
<property name="JMS_PROD_TIME_TO_LIVE" value="15000" />
<store messageStore="my_jms_message_store" />
使用 WSO2 ESB 4.9.0(使用突触版本 2.1.3-wso2v11)进行测试
您可以在 JmsProducer code
中找到更多信息您可以使用相同的方式设置消息优先级 (属性 JMS_PROD_PRIORITY)。