ActiveMQ 消息出列但未被消费

ActiveMQ messages dequeud but not consumed

我有一个 JBoss Web 应用程序,目前正在使用嵌入式 HornetQ for JMS。我们想切换到 ActiveMQ HA 集群,但我 运行 遇到了一些奇怪的问题。我的队列之一 (periodicDerivationQueue) 的行为与 HornetQ 不同。 AMQ 控制台显示消息已入队和出队,但它们没有到达我的消费者。起初,我假设消息出于某种原因正在出列到 DLQ 中,但事实似乎并非如此。据我了解,除非有必要,否则 AMQ 不会创建 DLQ。当我查看经纪人时,没有 DLQ。我怎样才能知道我的消息要去哪里?

由于反射,我也无法从堆栈的应用程序端进行调试。我想在 AMQ 端设置一个断点以查看我的消息发生了什么,但我不确定将它放在哪里。这里有什么想法吗?

这可能是序列化问题吗?我听说有时 JMS 代理之间的序列化差异会导致奇怪的行为。

我真的被困在这里了,我们将不胜感激。请参阅下面的配置信息。

野蝇 8.2

AMQ 5.13

消费者(此处未显示消息)

public class PeriodicDerivationExecutionHandlerImpl implements PeriodicDerivationExecutionHandler {

protected DerivationService derivationService;
protected DerivationModelService derivationModelService;

protected Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void executeDerivation(PeriodicDerivation params) throws Exception{

    JbpmHibernateUtil.openSession();
    Derivation derivation = null;
    try{            

        if (params.isGroup()){
            derivation = new GroupDerivation();

            GroupQueryParameters qp = new GroupQueryParameters();
            qp.setGroupName(params.getItemName());
            derivation.setDerivedItem(derivationModelService.getGroup(qp));

        }else{
            derivation = new DeterminantDerivation();

            DeterminantQueryParameters qp = new DeterminantQueryParameters();
            qp.setDeterminantName(params.getItemName());            
            derivation.setDerivedItem(derivationModelService.getDeterminant(qp));

        }

        logger.info("Executing periodic derivation [" + derivation + "]");

        derivation.setModelEffectiveDate(new DateTime());
        derivation.setPeriod(params.getPeriod());
        derivation.getProcessParameters().add(new DerivationProcessParameter(PeriodicDerivation.PERIODIC_PROCESS_VAR, true));
        derivation.setExecutionMode(DerivationExecutionMode.SYNCHRONOUS_LOCAL);
        derivationService.executeDerivation(derivation);

        JbpmHibernateUtil.closeSession(true);
    }catch(Exception e){
        logger.error("Periodic derivation execution failed for [" + derivation + "]",e);
        JbpmHibernateUtil.closeSession(false);
        throw new Exception("Periodic derivation execution failed for [" + derivation + "]",e);
    }
}

public DerivationService getDerivationService() {
    return derivationService;
}

public void setDerivationService(DerivationService derivationService) {
    this.derivationService = derivationService;
}

public DerivationModelService getDerivationModelService() {
    return derivationModelService;
}

public void setDerivationModelService(DerivationModelService derivationModelService) {
    this.derivationModelService = derivationModelService;
}

}

消费者XML 配置

<int:gateway id="periodicDerivationExecutionGateway"
        service-interface="com.etse.jbpm.scheduler.PeriodicDerivationExecutionHandler">
        <int:method name="executeDerivation" request-channel="periodicDerivationChannel" />
    </int:gateway>

    <bean id="periodicDerivationExecutor"
        class="com.etse.jbpm.scheduler.PeriodicDerivationExecutionHandlerImpl">
        <property name="derivationService" ref="derivationService" />
        <property name="derivationModelService" ref="derivationModelService" />
    </bean>

    <int:service-activator input-channel="periodicDerivationChannel"
        ref="periodicDerivationExecutor" method="executeDerivation" />

    <int-jms:channel id="periodicDerivationChannel"
        queue-name="${jms.destination.name.periodicderivation}" concurrency="${integration.listener.threads.maximum}"
        task-executor="periodicDerivationTaskExecutor" />

ActiveMQ Standalone.xml (Jboss)

<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0">
        <resource-adapters>
            <resource-adapter id="activemq-rar.rar">
                <archive>
                    activemq-rar.rar
                </archive>
                <transaction-support>XATransaction</transaction-support>
                <config-property name="ServerUrl">
                    tcp://127.0.0.1:61616?jms.rmIdFromConnectionId=true
                </config-property>
                <config-property name="UserName">
                    admin
                </config-property>
                <config-property name="Password">
                    admin
                </config-property>
                <connection-definitions>

                    <connection-definition 
                       class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory" 
                       jndi-name="java:/ConnectionFactory"  
           enabled="true" 
           pool-name="ConnectionFactory"> 
                        <xa-pool>
                            <min-pool-size>1</min-pool-size>
                            <max-pool-size>20</max-pool-size>
                            <prefill>false</prefill>
                            <is-same-rm-override>false</is-same-rm-override>
                        </xa-pool>
                        <recovery>
                            <recover-credential>
                                <user-name>admin</user-name>
                                <password>admin</password>
                            </recover-credential>
                            <recover-plugin class-name="org.jboss.jca.core.recovery.ConfigurableRecoveryPlugin">
                                <config-property name="EnableIsValid">
                                    false
                                </config-property>
                                <config-property name="IsValidOverride">
                                    true
                                </config-property>
                                <config-property name="EnableClose">
                                    true
                                </config-property>
                            </recover-plugin>
                        </recovery>
                    </connection-definition>

                </connection-definitions>

Queues/Topics

<admin-objects>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" 
                                jndi-name="java:jboss/exported/jms/queue/bpm/deferredBpmCommandQueue" 
                                use-java-context="true" 
                                pool-name="deferredBpmCommandQueue">
                        <config-property name="PhysicalName">
                            deferredBpmCommandQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" 
                                jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionRequestQueue" 
                                use-java-context="true" 
                                pool-name="ActiveMQQueue.asyncActionRequestQueue">
                        <config-property name="PhysicalName">
                            asyncActionRequestQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" 
                                jndi-name="java:jboss/exported/jms/queue/bpm/DLQ" 
                                use-java-context="true" 
                                pool-name="DLQ">
                        <config-property name="PhysicalName">
                            DLQ
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationQueue" use-java-context="true" pool-name="ActiveMQQueue.cacheUpdateReplicationQueue">
                        <config-property name="PhysicalName">
                            cacheUpdateReplicationQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/periodicDerivationQueue" use-java-context="true" pool-name="ActiveMQQueue.periodicDerivationQueue">
                        <config-property name="PhysicalName">
                            periodicDerivationQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncServiceSignalQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncServiceSignalQueue">
                        <config-property name="PhysicalName">
                            asyncServiceSignalQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/processEventTopic" use-java-context="true" pool-name="ActiveMQTopic.processEventTopic">
                        <config-property name="PhysicalName">
                            processEventTopic
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionReplyQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncActionReplyQueue">
                        <config-property name="PhysicalName">
                            asyncActionReplyQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/ExpiryQueue" use-java-context="true" pool-name="ActiveMQQueue.ExpiryQueue">
                        <config-property name="PhysicalName">
                            ExpiryQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceStatusRequestTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionServiceStatusRequestTopic">
                        <config-property name="PhysicalName">
                            asyncActionServiceStatusRequestTopic
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionAffinityRequestTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionAffinityRequestTopic">
                        <config-property name="PhysicalName">
                            asyncActionAffinityRequestTopic
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/jbpmJobQueue" use-java-context="true" pool-name="ActiveMQQueue.jbpmJobQueue">
                        <config-property name="PhysicalName">
                            jbpmJobQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionAffinityReplyTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionAffinityReplyTopic">
                        <config-property name="PhysicalName">
                            asyncActionAffinityReplyTopic
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationEventTopic" use-java-context="true" pool-name="ActiveMQTopic.cacheUpdateReplicationEventTopic">
                        <config-property name="PhysicalName">
                            cacheUpdateReplicationEventTopic
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceStatusTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionServiceStatusTopic">
                        <config-property name="PhysicalName">
                            asyncActionServiceStatusTopic
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceLogRecordQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncActionServiceLogRecordQueue">
                        <config-property name="PhysicalName">
                            asyncActionServiceLogRecordQueue
                        </config-property>
                    </admin-object>
                </admin-objects>

经纪人配置

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
        <value>file:${activemq.conf}/credentials.properties</value>
    </property>
</bean>


<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
      lazy-init="false" scope="singleton"
      init-method="start" destroy-method="stop">
</bean>

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker1" dataDirectory="${activemq.data}" persistent="true"> 

    <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" >

              <pendingMessageLimitStrategy>
                <constantPendingMessageLimitStrategy limit="1000"/>
              </pendingMessageLimitStrategy>
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>


    <managementContext>
        <managementContext createConnector="false"/>
    </managementContext>


    <persistenceAdapter>
      <kahaDB directory="${activemq.data}/kahadb"/> 
    </persistenceAdapter>

      <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage percentOfJvmHeap="70" />
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="100 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="50 gb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>


    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    </transportConnectors>

    <shutdownHooks>
        <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
    </shutdownHooks>

</broker>
<import resource="jetty.xml"/>

大黄蜂Q Standalone.xml (Jboss)

<subsystem xmlns="urn:jboss:domain:messaging:2.0">
        <hornetq-server>
            <persistence-enabled>false</persistence-enabled>
            <jmx-management-enabled>true</jmx-management-enabled>
            <shared-store>true</shared-store>
            <journal-type>ASYNCIO</journal-type>
            <journal-file-size>102400</journal-file-size>
            <journal-min-files>2</journal-min-files>

            <connectors>
                <netty-connector name="netty" socket-binding="messaging"/>
                <netty-connector name="netty-throughput" socket-binding="messaging-throughput">
                    <param key="batch-delay" value="50"/>
                </netty-connector>
                <in-vm-connector name="in-vm" server-id="0"/>
            </connectors>

            <acceptors>
                <netty-acceptor name="netty" socket-binding="messaging"/>
                <netty-acceptor name="netty-throughput" socket-binding="messaging-throughput">
                    <param key="batch-delay" value="50"/>
                    <param key="direct-deliver" value="false"/>
                </netty-acceptor>
                <in-vm-acceptor name="in-vm" server-id="0"/>
            </acceptors>

            <security-settings>
                <security-setting match="#">
                    <permission type="send" roles="guest"/>
                    <permission type="consume" roles="guest"/>
                    <permission type="createNonDurableQueue" roles="guest"/>
                    <permission type="deleteNonDurableQueue" roles="guest"/>
                </security-setting>
            </security-settings>

            <address-settings>
                <address-setting match="#">
                    <dead-letter-address>jms.queue.DLQ</dead-letter-address>
                    <expiry-address>jms.queue.ExpiryQueue</expiry-address>
                    <redelivery-delay>0</redelivery-delay>
                    <max-size-bytes>104857600</max-size-bytes>
                    <page-size-bytes>10485760</page-size-bytes>
                    <page-max-cache-size>10</page-max-cache-size>
                    <address-full-policy>PAGE</address-full-policy>
                    <message-counter-history-day-limit>10</message-counter-history-day-limit>
                </address-setting>
            </address-settings>

            <jms-connection-factories>
                <connection-factory name="InVmConnectionFactory">
                    <connectors>
                        <connector-ref connector-name="in-vm"/>
                    </connectors>
                    <entries>
                        <entry name="java:/ConnectionFactory"/>
                    </entries>
                </connection-factory>
                <connection-factory name="RemoteConnectionFactory">
                    <connectors>
                        <connector-ref connector-name="netty"/>
                    </connectors>
                    <entries>
                        <entry name="java:jboss/exported/jms/RemoteConnectionFactory"/>
                    </entries>
                    <client-failure-check-period>30000</client-failure-check-period>
                    <connection-ttl>300000</connection-ttl>
                    <retry-interval>2000</retry-interval>
                    <retry-interval-multiplier>1</retry-interval-multiplier>
                    <max-retry-interval>2000</max-retry-interval>
                    <reconnect-attempts>100</reconnect-attempts>
                </connection-factory>
            </jms-connection-factories>

Queues/Topics

<jms-destinations>
                <jms-queue name="asyncActionRequestQueue">
                    <entry name="queue/bpm/asyncActionRequestQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/asyncActionRequestQueue"/>
                </jms-queue>
                <jms-queue name="asyncActionReplyQueue">
                    <entry name="queue/bpm/asyncActionReplyQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/asyncActionReplyQueue"/>
                </jms-queue>
                <jms-queue name="asyncServiceSignalQueue">
                    <entry name="queue/bpm/asyncServiceSignalQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/asyncServiceSignalQueue"/>
                </jms-queue>
                <jms-queue name="asyncActionServiceLogRecordQueue">
                    <entry name="queue/bpm/asyncActionServiceLogRecordQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/asyncActionServiceLogRecordQueue"/>
                </jms-queue>
                <jms-queue name="deferredBpmCommandQueue">
                    <entry name="queue/bpm/deferredBpmCommandQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/deferredBpmCommandQueue"/>
                </jms-queue>
                <jms-queue name="jbpmJobQueue">
                    <entry name="queue/bpm/jbpmJobQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/jbpmJobQueue"/>
                </jms-queue>
                <jms-queue name="DLQ">
                    <entry name="queue/DLQ"/>
                    <entry name="java:jboss/exported/jms/queue/DLQ"/>
                </jms-queue>
                <jms-queue name="ExpiryQueue">
                    <entry name="queue/ExpiryQueue"/>
                    <entry name="java:jboss/exported/jms/queue/ExpiryQueue"/>
                </jms-queue>
                <jms-queue name="periodicDerivationQueue">
                    <entry name="queue/bpm/periodicDerivationQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/periodicDerivationQueue"/>
                </jms-queue>
                <jms-queue name="cacheUpdateReplicationQueue">
                    <entry name="queue/bpm/cacheUpdateReplicationQueue"/>
                    <entry name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationQueue"/>
                </jms-queue>
                <jms-topic name="asyncActionServiceStatusTopic">
                    <entry name="topic/bpm/asyncActionServiceStatusTopic"/>
                    <entry name="java:jboss/exported/jms/topic/bpm/asyncActionServiceStatusTopic"/>
                </jms-topic>
                <jms-topic name="asyncActionServiceStatusRequestTopic">
                    <entry name="topic/bpm/asyncActionServiceStatusRequestTopic"/>
                    <entry name="java:jboss/exported/jms/topic/bpm/asyncActionServiceStatusRequestTopic"/>
                </jms-topic>
                <jms-topic name="asyncActionAffinityRequestTopic">
                    <entry name="topic/bpm/asyncActionAffinityRequestTopic"/>
                    <entry name="java:jboss/exported/jms/topic/bpm/asyncActionAffinityRequestTopic"/>
                </jms-topic>
                <jms-topic name="asyncActionAffinityReplyTopic">
                    <entry name="topic/bpm/asyncActionAffinityReplyTopic"/>
                    <entry name="java:jboss/exported/jms/topic/bpm/asyncActionAffinityReplyTopic"/>
                </jms-topic>
                <jms-topic name="processEventTopic">
                    <entry name="topic/bpm/processEventTopic"/>
                    <entry name="java:jboss/exported/jms/topic/bpm/processEventTopic"/>
                </jms-topic>
                <jms-topic name="cacheUpdateReplicationEventTopic">
                    <entry name="topic/bpm/cacheUpdateReplicationEventTopic"/>
                    <entry name="java:jboss/exported/jms/topic/bpm/cacheUpdateReplicationEventTopic"/>
                </jms-topic>
            </jms-destinations>

ObjectMessage 序列化安全问题。

ObjectMessage 对象依赖于 marshal/unmarshal 对象负载的 Java 序列化。这个过程通常被认为是不安全的,因为恶意负载可以利用主机系统。这就是为什么从版本 5.12.2 和 5.13.0 开始,ActiveMQ 强制用户将可以使用 ObjectMessages 交换的包明确列入白名单。

我几天前看到这个并添加了一个白名单,但它没有解决问题。我还针对 AMQ 5.11.3 尝试了 运行,但没有成功。显然他们也在 5.11.3 中添加了安全功能。无论如何,我将此 (-Dorg.apache.activemq.SERIALIZABLE_PACKAGES="*") 添加到客户端和 AMQ vm 参数,现在一切正常。

请记住,我使用的命令行选项是安全的 我在我的代理中明确打开的漏洞,可以 允许恶意用户在我的系统上执行代码。正确的方法 使用该标志是明确列出您允许的 类 反序列化,或至多使用包通配符来避免显式列出 个人 类 和可信父包中的子包。