ActiveMQ Artemis 2.10.1 忽略 TomEE 中的重试设置

ActiveMQ Artemis 2.10.1 ignoring retry settings in TomEE

我下载了最新的 ActiveMQ Artemis 2.10.1(Windows 10,JDK 8)但无法使 address-settings 生效。在线阅读文档(不多)我通过添加编辑了 broker.xml

<address-settings>
    <address-setting match="BETATESTQ">
        <dead-letter-address>BETATESTQ_DLQ</dead-letter-address>
        <expiry-address>BETATESTQ_EXPIRY</expiry-address>
        <redelivery-delay>30000</redelivery-delay>
        <redelivery-delay-multiplier>1.5</redelivery-delay-multiplier>         
        <redelivery-collision-avoidance-factor>0.15</redelivery-collision-avoidance-factor>
        <max-redelivery-delay>100000</max-redelivery-delay>
        <max-delivery-attempts>999</max-delivery-attempts>      
    </address-setting>   
</address-settings>

<addresses>
    <address name="BETATESTQ_DLQ">
        <anycast>
            <queue name="BETATESTQ_DLQ" />
        </anycast>
    </address>   
    <address name="BETATESTQ_EXPIRY">
        <anycast>
            <queue name="BETATESTQ_EXPIRY" />
        </anycast>
    </address>   
    <address name="BETATESTQ">
        <anycast>
            <queue name="BETATESTQ" />
        </anycast>
    </address>  
</addresses>

创建代理时其他一切都是默认值broker.xml

redelivery-delaymax-redelivery-delaymax-delivery-attempts似乎总是使用默认值。它确实正确读取了死信和过期值。我可以在 TomEE 日志中看到消息被重试并显示在 Artemis 控制台中,并在完成后移至死信队列。

最初将数据放入队列时,我没有传入和传递或重试数据(使用最少量的 Java J2EE 代码)。

如何让它不忽略 redelivery-delaymax-redelivery-delaymax-delivery-attempts

我使用 Apache TomEE 连接到队列(未使用 TomEE 中的内置 ActiveMQ 5.x,而是将其指向 ActiveMQ Artemis)

JMS 侦听器:

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.*;

@MessageDriven(name = "BETATESTQ", activationConfig = {
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
    @ActivationConfigProperty(propertyName = "destination", propertyValue = "BETATESTQ"),
    @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge") 
})
public class BetaTestQueueListener implements MessageListener, java.io.Serializable {
    private static final long serialVersionUID = 1L;     

    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void onMessage(Message rcvMessage) {
        System.out.println("omMessage throw runtime exception");          
        throw new RuntimeException("trigger retry");
    }
}

12/20/19 - 我为 TomEE 8.0.0 找到的工作值:

tomee.xml:

<?xml version="1.0" encoding="UTF-8"?>
<tomee>

    <Resource id="artemis" class-name="org.apache.activemq.artemis.ra.ActiveMQResourceAdapter">
        ConnectorClassName=org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
        ConnectionParameters=host=127.0.0.1;port=61617;needClientAuth=false;sslEnabled=true;keyStorePath=../ssl/server_jks_keystore.jks;keyStorePassword=mypassword;trustStorePath=../ssl/client_jks_truststore.jks;trustStorePassword=mypassword;trustAll=true;verifyHost=false;wantClientAuth=false;needClientAuth=false;keyStoreProvider=JKS;trustSToreProvider=JKS
        UserName=admin
        Password=admin     
        JndiParams=java.naming.factory.initial=org.apache.openejb.core.OpenEJBInitialContextFactory
    </Resource>

    <Resource id="MyJmsConnectionFactory"
              class-name="org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory"
              constructor="uri,username,password"
              type="javax.jms.ConnectionFactory">
        uri=tcp://localhost:61617?needClientAuth=false;sslEnabled=true;keyStorePath=C:/apache-tomee-plus-8.0.0/ssl/server_jks_keystore.jks;keyStorePassword=mypassword;trustStorePath=C:/apache-tomee-plus-8.0.0/ssl/client_jks_truststore.jks;trustStorePassword=mypassword;trustAll=true;verifyHost=false;wantClientAuth=false;needClientAuth=false;keyStoreProvider=JKS;trustSToreProvider=JKS
        username=admin
        password=admin
        TransactionSupport=xa
        PoolMaxSize=20
        PoolMinSize=0   
    </Resource>     

    <Resource id="BETATESTQ_DLQ"
              class-name="org.apache.activemq.artemis.api.jms.ActiveMQJMSClient"
              constructor="name"
              factory-name="createQueue"
              type="javax.jms.Queue">
        name=BETATESTQ_DLQ
    </Resource> 
    <Resource id="BETATESTQ"
              class-name="org.apache.activemq.artemis.api.jms.ActiveMQJMSClient"
              constructor="name"
              factory-name="createQueue"
              type="javax.jms.Queue">
        name=BETATESTQ
    </Resource> 
    <Container id="mdb" type="MESSAGE">
        InstanceLimit = -1
        ResourceAdapter = artemis
        ActivationSpecClass = org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec
    </Container>
</tomee>

JavaEEclass发送JMS消息:

import java.util.*;
import javax.jms.*;
import org.slf4j.*;

public final class JmsPublisherInstance2 implements java.io.Serializable { 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LoggerFactory.getLogger(JmsPublisherInstance2.class);

   public void send( String msg,
                     ConnectionFactory connectionFactory,
                     Queue queue ) throws Exception { 
      Session session = null;
      MessageProducer producer = null;
      TextMessage message = null;
      Connection connection = null;
      try {     
            connection = connectionFactory.createConnection();       
            connection.start();
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            producer = session.createProducer(queue);
            message = session.createTextMessage(msg);
            producer.send(message);
            session.commit();
      }  catch (Exception e) {
         session.rollback();
         LOG.error(e.getMessage(), e);
         throw e;
      }  finally {
          if (session!=null ) {
              session.close();
          }
          if (connection!=null ) {
              connection.close();
          }
      }
   }
}

Java EE 听众:

import java.io.*;
import javax.annotation.*;
import javax.ejb.*;
import javax.jms.*;

@MessageDriven ( name = "BetaTESTQMDB" , activationConfig = {
    @ActivationConfigProperty(propertyName="destinationType", propertyValue = "javax.jms.Queue") ,
    @ActivationConfigProperty(propertyName="destination", propertyValue = "BetaTESTQ"),
    @ActivationConfigProperty(propertyName="maxSession", propertyValue = "5"),
    @ActivationConfigProperty(propertyName="acknowledgeMode", propertyValue = "Auto-acknowledge")
})

public class BetaTestQueueListener implements MessageListener, java.io.Serializable { 
      private static final long serialVersionUID = 1L;

      @Resource(name="MyJmsConnectionFactory")
      private ConnectionFactory connectionFactory;

      @Resource
      private MessageDrivenContext mdbContext;

      @Resource(name = "BETATESTQ") 
      private javax.jms.Queue betaTestQ;      

      @TransactionAttribute(TransactionAttributeType.REQUIRED)
      public void onMessage(Message rcvMessage) {
            try {
                jmsInstance.send("test message", connectionFactory, betaTestQ);
            } catch (Throwable t) {
                t.printStackTrace();
                mdbContext.setRollbackOnly();
            }
      }
}

您似乎正在使用 OpenWire JMS 客户端。这使事情变得复杂,因为据我所知,OpenWire JMS 客户端在客户端本身实现重新交付,并且这些重新交付语义是在客户端而不是代理端配置的。代理没有机会应用任何类型的重新传送策略,因为 OpenWire 客户端处理所有事情并且不通知代理传送失败。 This documentation 可以帮助您为 OpenWire 客户端配置重新传送。

也就是说,我建议关注 this tutorial。它演示了如何在没有 building/deploying JCA RA 的情况下集成 TomEE 和 ActiveMQ Artemis。这将允许您使用 Core JMS 客户端而不是 OpenWire JMS 客户端。

如果你想走 RA 路线,你可以通过 运行 mvn verifyexamples/features/sub-modules/artemis-ra-rar/ 目录构建 ActiveMQ Artemis JCA RA。 RA 将位于名为 artemis-rar-<version>.rartarget 目录中。