ActiveMQ Artemis 2.10.1 忽略 TomEE 中的重试设置
ActiveMQ Artemis 2.10.1 ignoring retry settings in TomEE
我下载了最新的 ActiveMQ Artemis 2.10.1(Windows 10,JDK 8)但无法使 address-settings
生效。在线阅读文档(不多)我通过添加编辑了 broker.xml
:
<address-settings>
<address-setting match="BETATESTQ">
<dead-letter-address>BETATESTQ_DLQ</dead-letter-address>
<expiry-address>BETATESTQ_EXPIRY</expiry-address>
<redelivery-delay>30000</redelivery-delay>
<redelivery-delay-multiplier>1.5</redelivery-delay-multiplier>
<redelivery-collision-avoidance-factor>0.15</redelivery-collision-avoidance-factor>
<max-redelivery-delay>100000</max-redelivery-delay>
<max-delivery-attempts>999</max-delivery-attempts>
</address-setting>
</address-settings>
<addresses>
<address name="BETATESTQ_DLQ">
<anycast>
<queue name="BETATESTQ_DLQ" />
</anycast>
</address>
<address name="BETATESTQ_EXPIRY">
<anycast>
<queue name="BETATESTQ_EXPIRY" />
</anycast>
</address>
<address name="BETATESTQ">
<anycast>
<queue name="BETATESTQ" />
</anycast>
</address>
</addresses>
创建代理时其他一切都是默认值broker.xml
。
redelivery-delay
、max-redelivery-delay
、max-delivery-attempts
似乎总是使用默认值。它确实正确读取了死信和过期值。我可以在 TomEE 日志中看到消息被重试并显示在 Artemis 控制台中,并在完成后移至死信队列。
最初将数据放入队列时,我没有传入和传递或重试数据(使用最少量的 Java J2EE 代码)。
如何让它不忽略 redelivery-delay
、max-redelivery-delay
、max-delivery-attempts
?
我使用 Apache TomEE 连接到队列(未使用 TomEE 中的内置 ActiveMQ 5.x,而是将其指向 ActiveMQ Artemis)
JMS 侦听器:
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.*;
@MessageDriven(name = "BETATESTQ", activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "BETATESTQ"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge")
})
public class BetaTestQueueListener implements MessageListener, java.io.Serializable {
private static final long serialVersionUID = 1L;
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void onMessage(Message rcvMessage) {
System.out.println("omMessage throw runtime exception");
throw new RuntimeException("trigger retry");
}
}
12/20/19 - 我为 TomEE 8.0.0 找到的工作值:
tomee.xml:
<?xml version="1.0" encoding="UTF-8"?>
<tomee>
<Resource id="artemis" class-name="org.apache.activemq.artemis.ra.ActiveMQResourceAdapter">
ConnectorClassName=org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
ConnectionParameters=host=127.0.0.1;port=61617;needClientAuth=false;sslEnabled=true;keyStorePath=../ssl/server_jks_keystore.jks;keyStorePassword=mypassword;trustStorePath=../ssl/client_jks_truststore.jks;trustStorePassword=mypassword;trustAll=true;verifyHost=false;wantClientAuth=false;needClientAuth=false;keyStoreProvider=JKS;trustSToreProvider=JKS
UserName=admin
Password=admin
JndiParams=java.naming.factory.initial=org.apache.openejb.core.OpenEJBInitialContextFactory
</Resource>
<Resource id="MyJmsConnectionFactory"
class-name="org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory"
constructor="uri,username,password"
type="javax.jms.ConnectionFactory">
uri=tcp://localhost:61617?needClientAuth=false;sslEnabled=true;keyStorePath=C:/apache-tomee-plus-8.0.0/ssl/server_jks_keystore.jks;keyStorePassword=mypassword;trustStorePath=C:/apache-tomee-plus-8.0.0/ssl/client_jks_truststore.jks;trustStorePassword=mypassword;trustAll=true;verifyHost=false;wantClientAuth=false;needClientAuth=false;keyStoreProvider=JKS;trustSToreProvider=JKS
username=admin
password=admin
TransactionSupport=xa
PoolMaxSize=20
PoolMinSize=0
</Resource>
<Resource id="BETATESTQ_DLQ"
class-name="org.apache.activemq.artemis.api.jms.ActiveMQJMSClient"
constructor="name"
factory-name="createQueue"
type="javax.jms.Queue">
name=BETATESTQ_DLQ
</Resource>
<Resource id="BETATESTQ"
class-name="org.apache.activemq.artemis.api.jms.ActiveMQJMSClient"
constructor="name"
factory-name="createQueue"
type="javax.jms.Queue">
name=BETATESTQ
</Resource>
<Container id="mdb" type="MESSAGE">
InstanceLimit = -1
ResourceAdapter = artemis
ActivationSpecClass = org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec
</Container>
</tomee>
JavaEEclass发送JMS消息:
import java.util.*;
import javax.jms.*;
import org.slf4j.*;
public final class JmsPublisherInstance2 implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(JmsPublisherInstance2.class);
public void send( String msg,
ConnectionFactory connectionFactory,
Queue queue ) throws Exception {
Session session = null;
MessageProducer producer = null;
TextMessage message = null;
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
producer = session.createProducer(queue);
message = session.createTextMessage(msg);
producer.send(message);
session.commit();
} catch (Exception e) {
session.rollback();
LOG.error(e.getMessage(), e);
throw e;
} finally {
if (session!=null ) {
session.close();
}
if (connection!=null ) {
connection.close();
}
}
}
}
Java EE 听众:
import java.io.*;
import javax.annotation.*;
import javax.ejb.*;
import javax.jms.*;
@MessageDriven ( name = "BetaTESTQMDB" , activationConfig = {
@ActivationConfigProperty(propertyName="destinationType", propertyValue = "javax.jms.Queue") ,
@ActivationConfigProperty(propertyName="destination", propertyValue = "BetaTESTQ"),
@ActivationConfigProperty(propertyName="maxSession", propertyValue = "5"),
@ActivationConfigProperty(propertyName="acknowledgeMode", propertyValue = "Auto-acknowledge")
})
public class BetaTestQueueListener implements MessageListener, java.io.Serializable {
private static final long serialVersionUID = 1L;
@Resource(name="MyJmsConnectionFactory")
private ConnectionFactory connectionFactory;
@Resource
private MessageDrivenContext mdbContext;
@Resource(name = "BETATESTQ")
private javax.jms.Queue betaTestQ;
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void onMessage(Message rcvMessage) {
try {
jmsInstance.send("test message", connectionFactory, betaTestQ);
} catch (Throwable t) {
t.printStackTrace();
mdbContext.setRollbackOnly();
}
}
}
您似乎正在使用 OpenWire JMS 客户端。这使事情变得复杂,因为据我所知,OpenWire JMS 客户端在客户端本身实现重新交付,并且这些重新交付语义是在客户端而不是代理端配置的。代理没有机会应用任何类型的重新传送策略,因为 OpenWire 客户端处理所有事情并且不通知代理传送失败。 This documentation 可以帮助您为 OpenWire 客户端配置重新传送。
也就是说,我建议关注 this tutorial。它演示了如何在没有 building/deploying JCA RA 的情况下集成 TomEE 和 ActiveMQ Artemis。这将允许您使用 Core JMS 客户端而不是 OpenWire JMS 客户端。
如果你想走 RA 路线,你可以通过 运行 mvn verify
从 examples/features/sub-modules/artemis-ra-rar/
目录构建 ActiveMQ Artemis JCA RA。 RA 将位于名为 artemis-rar-<version>.rar
的 target
目录中。
我下载了最新的 ActiveMQ Artemis 2.10.1(Windows 10,JDK 8)但无法使 address-settings
生效。在线阅读文档(不多)我通过添加编辑了 broker.xml
:
<address-settings>
<address-setting match="BETATESTQ">
<dead-letter-address>BETATESTQ_DLQ</dead-letter-address>
<expiry-address>BETATESTQ_EXPIRY</expiry-address>
<redelivery-delay>30000</redelivery-delay>
<redelivery-delay-multiplier>1.5</redelivery-delay-multiplier>
<redelivery-collision-avoidance-factor>0.15</redelivery-collision-avoidance-factor>
<max-redelivery-delay>100000</max-redelivery-delay>
<max-delivery-attempts>999</max-delivery-attempts>
</address-setting>
</address-settings>
<addresses>
<address name="BETATESTQ_DLQ">
<anycast>
<queue name="BETATESTQ_DLQ" />
</anycast>
</address>
<address name="BETATESTQ_EXPIRY">
<anycast>
<queue name="BETATESTQ_EXPIRY" />
</anycast>
</address>
<address name="BETATESTQ">
<anycast>
<queue name="BETATESTQ" />
</anycast>
</address>
</addresses>
创建代理时其他一切都是默认值broker.xml
。
redelivery-delay
、max-redelivery-delay
、max-delivery-attempts
似乎总是使用默认值。它确实正确读取了死信和过期值。我可以在 TomEE 日志中看到消息被重试并显示在 Artemis 控制台中,并在完成后移至死信队列。
最初将数据放入队列时,我没有传入和传递或重试数据(使用最少量的 Java J2EE 代码)。
如何让它不忽略 redelivery-delay
、max-redelivery-delay
、max-delivery-attempts
?
我使用 Apache TomEE 连接到队列(未使用 TomEE 中的内置 ActiveMQ 5.x,而是将其指向 ActiveMQ Artemis)
JMS 侦听器:
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.*;
@MessageDriven(name = "BETATESTQ", activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "BETATESTQ"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge")
})
public class BetaTestQueueListener implements MessageListener, java.io.Serializable {
private static final long serialVersionUID = 1L;
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void onMessage(Message rcvMessage) {
System.out.println("omMessage throw runtime exception");
throw new RuntimeException("trigger retry");
}
}
12/20/19 - 我为 TomEE 8.0.0 找到的工作值:
tomee.xml:
<?xml version="1.0" encoding="UTF-8"?>
<tomee>
<Resource id="artemis" class-name="org.apache.activemq.artemis.ra.ActiveMQResourceAdapter">
ConnectorClassName=org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
ConnectionParameters=host=127.0.0.1;port=61617;needClientAuth=false;sslEnabled=true;keyStorePath=../ssl/server_jks_keystore.jks;keyStorePassword=mypassword;trustStorePath=../ssl/client_jks_truststore.jks;trustStorePassword=mypassword;trustAll=true;verifyHost=false;wantClientAuth=false;needClientAuth=false;keyStoreProvider=JKS;trustSToreProvider=JKS
UserName=admin
Password=admin
JndiParams=java.naming.factory.initial=org.apache.openejb.core.OpenEJBInitialContextFactory
</Resource>
<Resource id="MyJmsConnectionFactory"
class-name="org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory"
constructor="uri,username,password"
type="javax.jms.ConnectionFactory">
uri=tcp://localhost:61617?needClientAuth=false;sslEnabled=true;keyStorePath=C:/apache-tomee-plus-8.0.0/ssl/server_jks_keystore.jks;keyStorePassword=mypassword;trustStorePath=C:/apache-tomee-plus-8.0.0/ssl/client_jks_truststore.jks;trustStorePassword=mypassword;trustAll=true;verifyHost=false;wantClientAuth=false;needClientAuth=false;keyStoreProvider=JKS;trustSToreProvider=JKS
username=admin
password=admin
TransactionSupport=xa
PoolMaxSize=20
PoolMinSize=0
</Resource>
<Resource id="BETATESTQ_DLQ"
class-name="org.apache.activemq.artemis.api.jms.ActiveMQJMSClient"
constructor="name"
factory-name="createQueue"
type="javax.jms.Queue">
name=BETATESTQ_DLQ
</Resource>
<Resource id="BETATESTQ"
class-name="org.apache.activemq.artemis.api.jms.ActiveMQJMSClient"
constructor="name"
factory-name="createQueue"
type="javax.jms.Queue">
name=BETATESTQ
</Resource>
<Container id="mdb" type="MESSAGE">
InstanceLimit = -1
ResourceAdapter = artemis
ActivationSpecClass = org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec
</Container>
</tomee>
JavaEEclass发送JMS消息:
import java.util.*;
import javax.jms.*;
import org.slf4j.*;
public final class JmsPublisherInstance2 implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(JmsPublisherInstance2.class);
public void send( String msg,
ConnectionFactory connectionFactory,
Queue queue ) throws Exception {
Session session = null;
MessageProducer producer = null;
TextMessage message = null;
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
producer = session.createProducer(queue);
message = session.createTextMessage(msg);
producer.send(message);
session.commit();
} catch (Exception e) {
session.rollback();
LOG.error(e.getMessage(), e);
throw e;
} finally {
if (session!=null ) {
session.close();
}
if (connection!=null ) {
connection.close();
}
}
}
}
Java EE 听众:
import java.io.*;
import javax.annotation.*;
import javax.ejb.*;
import javax.jms.*;
@MessageDriven ( name = "BetaTESTQMDB" , activationConfig = {
@ActivationConfigProperty(propertyName="destinationType", propertyValue = "javax.jms.Queue") ,
@ActivationConfigProperty(propertyName="destination", propertyValue = "BetaTESTQ"),
@ActivationConfigProperty(propertyName="maxSession", propertyValue = "5"),
@ActivationConfigProperty(propertyName="acknowledgeMode", propertyValue = "Auto-acknowledge")
})
public class BetaTestQueueListener implements MessageListener, java.io.Serializable {
private static final long serialVersionUID = 1L;
@Resource(name="MyJmsConnectionFactory")
private ConnectionFactory connectionFactory;
@Resource
private MessageDrivenContext mdbContext;
@Resource(name = "BETATESTQ")
private javax.jms.Queue betaTestQ;
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void onMessage(Message rcvMessage) {
try {
jmsInstance.send("test message", connectionFactory, betaTestQ);
} catch (Throwable t) {
t.printStackTrace();
mdbContext.setRollbackOnly();
}
}
}
您似乎正在使用 OpenWire JMS 客户端。这使事情变得复杂,因为据我所知,OpenWire JMS 客户端在客户端本身实现重新交付,并且这些重新交付语义是在客户端而不是代理端配置的。代理没有机会应用任何类型的重新传送策略,因为 OpenWire 客户端处理所有事情并且不通知代理传送失败。 This documentation 可以帮助您为 OpenWire 客户端配置重新传送。
也就是说,我建议关注 this tutorial。它演示了如何在没有 building/deploying JCA RA 的情况下集成 TomEE 和 ActiveMQ Artemis。这将允许您使用 Core JMS 客户端而不是 OpenWire JMS 客户端。
如果你想走 RA 路线,你可以通过 运行 mvn verify
从 examples/features/sub-modules/artemis-ra-rar/
目录构建 ActiveMQ Artemis JCA RA。 RA 将位于名为 artemis-rar-<version>.rar
的 target
目录中。