Mule JMS 主题和 ActiveMQ 配置
Mule JMS Topic and ActiveMQ Configuration
我正在使用 Mule ESB 设计一个流程,通过该流程可以 post 向主题发送消息。订阅者将收听主题并接收消息。每个订阅者将以不同的方式处理消息。此处的目标是能够 post 从 HTTP 向主题发送测试消息以测试订阅者。
这是我配置 JMS 连接的方式:
<!-- JMS Topic connector -->
<jms:activemq-connector name="jmsTopicConnection" specification="1.1" brokerURL="tcp://localhost:61616" validateConnections="true" doc:name="Active MQ2" durable="true" numberOfConcurrentTransactedReceivers="2"/>
这是流程:
<flow name="auditJMSServiceFlow">
<http:listener config-ref="HTTP" path="/Audit/Activity" responseStreamingMode="ALWAYS" doc:name="HTTP"/>
<set-variable variableName="#['id']" value="#[message.inboundProperties['id']]" doc:name="set dynamic id"/>
<set-payload value="===TOPIC===" doc:name="Set Payload" />
<request-reply storePrefix="mainFlow">
<jms:inbound-endpoint topic="Audit.Activity" connector-ref="jmsTopicConnection" doc:name="JMS Topic Audit.Activity" exchange-pattern="request-response" durableName="audit_activity">
<jms:transaction action="ALWAYS_BEGIN" />
<!-- Not required to explicitly have this element. Mule will put this in implicitly. -->
<!-- <jms:jmsmessage-to-object-transformer displayName="JmsMsg to Object"/> -->
</jms:inbound-endpoint>
</request-reply>
<json:object-to-json-transformer doc:name="transform JMS message to JSON"/>
<json:validate-schema schemaLocation="resource://AuditMsgSchema.json" doc:name="Validate Json Schema"/>
<component class="com.baml.panther.audit.service.impl.AuditServiceImpl" doc:name="Java"/>
<default-exception-strategy>
<commit-transaction exception-pattern="com.foo.ExpectedExceptionType"/>
<jms:outbound-endpoint queue="dead.letter" connector-ref="jmsConnection">
<jms:transaction action="JOIN_IF_POSSIBLE" />
</jms:outbound-endpoint>
</default-exception-strategy>
<logger message="=== #[message.payload] received #[org.mule.util.DateUtiles.getTimeStamp('dd-MM-yyyy_HH-mm-ss.SSS')]" level="INFO" doc:name="Logger"/>
当我 运行 通过测试时,出现以下错误:
如有任何建议,我们将不胜感激。
拉斯
问题是您不能将消息源作为请求-回复中的第一个消息处理器。请求回复允许您对异步协议(如 JMS)进行一种同步调用。
如果您想在放置请求-回复的位置向消息代理发送消息,只需放置一个 JMS 出站端点。
如果您想做的是使用来自 JMS 主题的消息,您必须将 JMS 入站端点作为流中的第一个消息处理器。
对于错误:您的 request-reply
范围缺少出站端点。您只有入站端点 (jms:inbound-endpoint)。您还需要提供出站端点。
<request-reply storePrefix="mainFlow">
<jms:inbound-endpoint topic="Audit.Activity" connector-ref="jmsTopicConnection" doc:name="JMS Topic Audit.Activity" exchange-pattern="request-response" durableName="audit_activity">
<jms:transaction action="ALWAYS_BEGIN" />
<!-- Not required to explicitly have this element. Mule will put this in implicitly. -->
<!-- <jms:jmsmessage-to-object-transformer displayName="JmsMsg to Object"/> -->
</jms:inbound-endpoint>
</request-reply>
不确定您的目标是什么,但如果您只放置一个 jms:outbound-enpoint
(而不是整个请求-回复块),您可以向 JMS 主题发送消息。
我正在使用 Mule ESB 设计一个流程,通过该流程可以 post 向主题发送消息。订阅者将收听主题并接收消息。每个订阅者将以不同的方式处理消息。此处的目标是能够 post 从 HTTP 向主题发送测试消息以测试订阅者。
这是我配置 JMS 连接的方式:
<!-- JMS Topic connector -->
<jms:activemq-connector name="jmsTopicConnection" specification="1.1" brokerURL="tcp://localhost:61616" validateConnections="true" doc:name="Active MQ2" durable="true" numberOfConcurrentTransactedReceivers="2"/>
这是流程:
<flow name="auditJMSServiceFlow">
<http:listener config-ref="HTTP" path="/Audit/Activity" responseStreamingMode="ALWAYS" doc:name="HTTP"/>
<set-variable variableName="#['id']" value="#[message.inboundProperties['id']]" doc:name="set dynamic id"/>
<set-payload value="===TOPIC===" doc:name="Set Payload" />
<request-reply storePrefix="mainFlow">
<jms:inbound-endpoint topic="Audit.Activity" connector-ref="jmsTopicConnection" doc:name="JMS Topic Audit.Activity" exchange-pattern="request-response" durableName="audit_activity">
<jms:transaction action="ALWAYS_BEGIN" />
<!-- Not required to explicitly have this element. Mule will put this in implicitly. -->
<!-- <jms:jmsmessage-to-object-transformer displayName="JmsMsg to Object"/> -->
</jms:inbound-endpoint>
</request-reply>
<json:object-to-json-transformer doc:name="transform JMS message to JSON"/>
<json:validate-schema schemaLocation="resource://AuditMsgSchema.json" doc:name="Validate Json Schema"/>
<component class="com.baml.panther.audit.service.impl.AuditServiceImpl" doc:name="Java"/>
<default-exception-strategy>
<commit-transaction exception-pattern="com.foo.ExpectedExceptionType"/>
<jms:outbound-endpoint queue="dead.letter" connector-ref="jmsConnection">
<jms:transaction action="JOIN_IF_POSSIBLE" />
</jms:outbound-endpoint>
</default-exception-strategy>
<logger message="=== #[message.payload] received #[org.mule.util.DateUtiles.getTimeStamp('dd-MM-yyyy_HH-mm-ss.SSS')]" level="INFO" doc:name="Logger"/>
当我 运行 通过测试时,出现以下错误:
如有任何建议,我们将不胜感激。
拉斯
问题是您不能将消息源作为请求-回复中的第一个消息处理器。请求回复允许您对异步协议(如 JMS)进行一种同步调用。 如果您想在放置请求-回复的位置向消息代理发送消息,只需放置一个 JMS 出站端点。 如果您想做的是使用来自 JMS 主题的消息,您必须将 JMS 入站端点作为流中的第一个消息处理器。
对于错误:您的 request-reply
范围缺少出站端点。您只有入站端点 (jms:inbound-endpoint)。您还需要提供出站端点。
<request-reply storePrefix="mainFlow">
<jms:inbound-endpoint topic="Audit.Activity" connector-ref="jmsTopicConnection" doc:name="JMS Topic Audit.Activity" exchange-pattern="request-response" durableName="audit_activity">
<jms:transaction action="ALWAYS_BEGIN" />
<!-- Not required to explicitly have this element. Mule will put this in implicitly. -->
<!-- <jms:jmsmessage-to-object-transformer displayName="JmsMsg to Object"/> -->
</jms:inbound-endpoint>
</request-reply>
不确定您的目标是什么,但如果您只放置一个 jms:outbound-enpoint
(而不是整个请求-回复块),您可以向 JMS 主题发送消息。