WildFly 10 Artemis 和 ActiveMQ 5.14 之间的 JMS 桥(ONCE_AND_ONLY_ONCE 服务质量)

JMS Bridge Between WildFly 10 Artemis and ActiveMQ 5.14 (ONCE_AND_ONLY_ONCE Quality of Service)

我正在尝试在 Artemis(WildFly 10 中的运行)和 ActiveMQ 5.14

之间建立一个 JMS 桥

总的来说,这个过程似乎很顺利,但不幸的是,我卡在了使 ONCE_AND_ONLY_ONCE QoS 工作所必需的 XA 配置上。

尽管消息确实到达了 ActiveMQ 端,但我在 WildFly 日志中收到以下错误:

11:25:57,920 WARN  [org.apache.activemq.artemis.jms.bridge] (Thread-97) AMQ342009: JMS Bridge failed to send + acknowledge batch, closing JMS objects: javax.jms.IllegalStateException: Not a transacted session
at org.apache.activemq.ActiveMQSession.commit(ActiveMQSession.java:577)
at org.apache.activemq.ra.ManagedSessionProxy.commit(ManagedSessionProxy.java:108)
at org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl.sendBatchNonTransacted(JMSBridgeImpl.java:1291)
at org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl.sendBatch(JMSBridgeImpl.java:1251)
at org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl.access00(JMSBridgeImpl.java:75)
at org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl$BatchTimeChecker.run(JMSBridgeImpl.java:1794)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

我的问题与这个类似(Wildfly 10 ONCE_AND_ONLY_ONCE JMS 桥),但不幸的是,该解决方案并不完全适用于我的情况,因为我的目标是 ActiveMQ 5.14。

从我读过的所有帖子来看,我似乎很清楚我应该确保应该为桥的两侧配置连接工厂以支持 XA。在 Artemis 方面,这似乎很简单:只需将 factory-type="XA_GENERIC" 添加到定义中即可。但是,我无法弄清楚如何在 ActiveMQ 端执行此操作。

这是来自我的独立 full.xml 的片段,它指定了我的消息传递子系统:

    <subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
        <server name="default">
            <security-setting name="#">
                <role name="guest" delete-non-durable-queue="true" create-non-durable-queue="true" consume="true" send="true"/>
            </security-setting>
            <address-setting name="#" message-counter-history-day-limit="10" page-size-bytes="2097152" max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" dead-letter-address="jms.queue.DLQ"/>
            <http-connector name="http-connector" endpoint="http-acceptor" socket-binding="http"/>
            <http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="http">
                <param name="batch-delay" value="50"/>
            </http-connector>
            <in-vm-connector name="in-vm" server-id="0"/>
            <http-acceptor name="http-acceptor" http-listener="default"/>
            <http-acceptor name="http-acceptor-throughput" http-listener="default">
                <param name="batch-delay" value="50"/>
                <param name="direct-deliver" value="false"/>
            </http-acceptor>
            <in-vm-acceptor name="in-vm" server-id="0"/>
            <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
            <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>
            <jms-queue name="NonBridgedTestQueue" entries="java:jboss/exported/jms/queue/nonBridgedTestQueue"/>
            <jms-queue name="BridgedTestQueue" entries="java:jboss/exported/jms/queue/bridgedTestQueue"/>
            <connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm"/>
            <connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector"/>
            <connection-factory name="InVmXAConnectionFactory" factory-type="XA_GENERIC" entries="java:/XAConnectionFactory" connectors="in-vm"/>               
            <pooled-connection-factory name="activemq-ra" transaction="xa" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"/>
        </server>
        <jms-bridge name="simple-jms-bridge" add-messageID-in-header="true" max-batch-time="100" max-batch-size="10" max-retries="5" failure-retry-interval="10000" quality-of-service="ONCE_AND_ONLY_ONCE">
            <source destination="jboss/exported/jms/queue/bridgedTestQueue" connection-factory="java:/XAConnectionFactory"/>
            <target destination="jboss/activemq/queue/bridgedTestQueue" connection-factory="AMQConnectionFactory"/>
        </jms-bridge>
    </subsystem>

对于 ActiveMQ 定义,我使用了资源适配器,定义如下:

    <subsystem xmlns="urn:jboss:domain:resource-adapters:4.0">
        <resource-adapters>
            <resource-adapter id="activemq">
                <archive>activemq-rar-5.14.0.rar</archive>
                <transaction-support>XATransaction</transaction-support>
                <config-property name="ServerUrl">tcp://localhost:61616?jms.rmIdFromConnectionId=true</config-property>
                <config-property name="UserName">admin</config-property>
                <config-property name="UseInboundSession">false</config-property>
                <config-property name="Password">admin</config-property>
                <connection-definitions>
                    <connection-definition class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory" jndi-name="java:/AMQConnectionFactory" enabled="true" pool-name="AMQConnectionFactory">
                        <xa-pool>
                            <min-pool-size>1</min-pool-size>
                            <max-pool-size>20</max-pool-size>
                            <prefill>false</prefill>
                            <is-same-rm-override>false</is-same-rm-override>
                        </xa-pool>
                    </connection-definition>
                </connection-definitions>
                <admin-objects>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/activemq/topic/TestTopic" use-java-context="true" pool-name="TestTopic">
                        <config-property name="PhysicalName">
                            activemq/topic/TestTopic
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/activemq/queue/TestQueue" use-java-context="true" pool-name="TestQueue">
                        <config-property name="PhysicalName">
                            activemq/queue/TestQueue
                        </config-property>
                    </admin-object>
                    <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/activemq/queue/bridgedTestQueue" use-java-context="true" pool-name="BridgedTestQueue">
                        <config-property name="PhysicalName">
                            activemq/queue/bridgedTestQueue
                        </config-property>
                    </admin-object>
                </admin-objects>
            </resource-adapter>
        </resource-adapters>
    </subsystem>

一些正确方向的指示将不胜感激

格雷格

发布这个问题后,我尝试了一些似乎有效的方法。仍然有兴趣听听这是否是正确的方法。

在查看资源适配器中的 ra.xml 文件时,我注意到它提供了一个额外的连接工厂作为管理对象。所以我在 standalone-full.xml:

中的资源适配器定义中添加了以下内容
                    <admin-object class-name="org.apache.activemq.ActiveMQXAConnectionFactory" jndi-name="java:jboss/activemq/activeMQXAConnectionFactory" use-java-context="true">
                        <config-property name="brokerURL">
                            tcp://localhost:61616?jms.rmIdFromConnectionId=true
                        </config-property>
                    </admin-object>

然后我使用这个连接工厂更新了 JMS 桥,如下所示:

        <jms-bridge name="simple-jms-bridge" add-messageID-in-header="true" max-batch-time="100" max-batch-size="10" max-retries="5" failure-retry-interval="10000" quality-of-service="ONCE_AND_ONLY_ONCE">
            <source destination="jboss/exported/jms/queue/bridgedTestQueue" connection-factory="java:/XAConnectionFactory"/>
            <target destination="jboss/activemq/queue/bridgedTestQueue" connection-factory="java:jboss/activemq/activeMQXAConnectionFactory"/>
        </jms-bridge>

现在似乎一切正常。消息到达 ActiveMQ 端,我在 WildFly 日志中没有遇到任何问题。呜呜呜