使 WebMQ 同步
Making WebMQ Synchronous
我试图让 WebMQ 在 MULE 中同步运行,以便将消息队列转换为内部项目的 REST api。不幸的是,我对MULE知之甚少。
经过一番努力后,我开始了解请求-回复范围并尝试将 WMQ 连接器与它一起使用,结果发现 WMQ 仅在流程结束后才发送。
我试过使用 VM 将请求放入不同的流程来触发它
但到目前为止,这只会导致它在永远不会发生的 WMQ 上永远等待。
我也试过使用 VM 来回:
但它似乎做的事情和没有一样,只是忽略了输入。
现在我知道 WMQ 的输入和输出正在工作,因为我之前设置了一个使用 http 与自身通信并在消息通过时通知它的流程。
我本质上想要的是这个,但是使用 HTTP 而不是 AJAX
我最近的,也许是最接近的尝试是这样的:
XML 是:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:http="http://www.mulesoft.org/schema/mule/http" version="EE-3.6.0" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:ajax="http://www.mulesoft.org/schema/mule/ajax" xmlns:core="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:stdio="http://www.mulesoft.org/schema/mule/stdio" xmlns:test="http://www.mulesoft.org/schema/mule/test" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:wmq="http://www.mulesoft.org/schema/mule/ee/wmq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/ajax http://www.mulesoft.org/schema/mule/ajax/current/mule-ajax.xsd
http://www.mulesoft.org/schema/mule/ee/wmq http://www.mulesoft.org/schema/mule/ee/wmq/current/mule-wmq-ee.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/stdio http://www.mulesoft.org/schema/mule/stdio/current/mule-stdio.xsd
http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd">
<wmq:connector channel="MAGENTO.SVRCONN" doc:name="WMQ Connector" hostName="[ip]" name="wmqConnector" port="1414" queueManager="MAGENTO" transportType="CLIENT_MQ_TCPIP" validateConnections="true" />
<vm:connector name="VM" validateConnections="true" doc:name="VM">
<vm:queue-profile maxOutstandingMessages="500">
<default-persistent-queue-store/>
</vm:queue-profile>
</vm:connector>
<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>
<flow name="Input">
<http:listener config-ref="HTTP_Listener_Configuration" path="/mq" doc:name="HTTP"/>
<set-payload value="#[message.inboundProperties.'http.query.params'.xml]" doc:name="Set Payload"/>
<logger message="Entering Request-Reply Payload: #[payload]" level="INFO" doc:name="Logger"/>
<request-reply doc:name="Request-Reply">
<vm:outbound-endpoint exchange-pattern="one-way" path="mq" connector-ref="VM" doc:name="VM_TriggerSend" />
<vm:inbound-endpoint exchange-pattern="request-response" path="mq-return" connector-ref="VM" doc:name="VM_Receive" />
</request-reply>
<logger message="Request-Reply has ended. Payload: #[payload]" level="INFO" doc:name="Logger"/>
</flow>
<flow name="Send_Message">
<vm:inbound-endpoint exchange-pattern="one-way" path="mq" connector-ref="VM" doc:name="VM_Send"/>
<logger message="(Preparing to Dispatch) Payload: #[payload]" level="INFO" doc:name="Logger"/>
<wmq:outbound-endpoint queue="PUT_QUEUE" connector-ref="wmqConnector" doc:name="WMQ"/>
<logger message="Message presumably being dispatched after this log" level="INFO" doc:name="Logger"/>
</flow>
<flow name="Receive_Message">
<wmq:inbound-endpoint queue="GET_QUEUE" connector-ref="wmqConnector" doc:name="WMQ_Receive" />
<logger message="Triggering Receive" level="INFO" doc:name="Logger"/>
<vm:outbound-endpoint exchange-pattern="request-response" path="mq-return" connector-ref="VM" doc:name="VM_TriggerReceive" />
</flow>
</mule>
几乎可以正常工作,但它会无限期挂起,而不会向 HTTP 服务器发回响应。将 Request-Reply 流中的 vm:inbound-endpoint 设置为单向可以防止它挂起,但不会发送已收到的新有效负载,而是发送先前的有效负载(就像它被跳过一样)。
如有任何帮助,我们将不胜感激!
使异步数据源同步确实很棘手。但我真的没有看到你的请求-回复策略有问题。
尝试在两端使用带有 WMQ 端点的请求-回复。然后监控队列并查看消息和回复是否按预期到达。您是否对请求和回复使用相同的队列?最简单的方案是使用不同的方案,这对您的用例可行吗?
如果我理解正确的话,您只是想公开对 queue 的请求和对众所周知的 queue 的响应作为 REST api.
那里不需要 VM queue,作为 JMS 传输的 request-reply 元素也无法模拟请求响应交换模式。
JMS queues 只是一种方式。您必须在不同的 queue 上回复。 request-response 场景背后的逻辑是 outbound-endpoint 具有出站 属性:MULE_REPLYTO
。这个 属性 应该设置为你对 queue 的众所周知的回复的名称:GET_QUEUE(如果没有提供这样的 header,将创建一个临时的 queue ).
MULE_REPLY 到 header 将成为标准 JMSReplyTo
当在服务上收到消息时。服务 必须尊重 这个 header 将回复发送回那个 queue。如果服务是在 Mule 上实现的,这将自动发生,否则它可能不会发生。你应该仔细检查它是否受到尊重。
如果你想使用一个 request-reply 作用域到 one-way 个端点而不是一个 request-reponse 端点,没关系,应该工作相同但代码更多。
我试图让 WebMQ 在 MULE 中同步运行,以便将消息队列转换为内部项目的 REST api。不幸的是,我对MULE知之甚少。
经过一番努力后,我开始了解请求-回复范围并尝试将 WMQ 连接器与它一起使用,结果发现 WMQ 仅在流程结束后才发送。
我试过使用 VM 将请求放入不同的流程来触发它
但到目前为止,这只会导致它在永远不会发生的 WMQ 上永远等待。
我也试过使用 VM 来回:
但它似乎做的事情和没有一样,只是忽略了输入。
现在我知道 WMQ 的输入和输出正在工作,因为我之前设置了一个使用 http 与自身通信并在消息通过时通知它的流程。
我本质上想要的是这个,但是使用 HTTP 而不是 AJAX
我最近的,也许是最接近的尝试是这样的:
XML 是:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:http="http://www.mulesoft.org/schema/mule/http" version="EE-3.6.0" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:ajax="http://www.mulesoft.org/schema/mule/ajax" xmlns:core="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:stdio="http://www.mulesoft.org/schema/mule/stdio" xmlns:test="http://www.mulesoft.org/schema/mule/test" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:wmq="http://www.mulesoft.org/schema/mule/ee/wmq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/ajax http://www.mulesoft.org/schema/mule/ajax/current/mule-ajax.xsd
http://www.mulesoft.org/schema/mule/ee/wmq http://www.mulesoft.org/schema/mule/ee/wmq/current/mule-wmq-ee.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/stdio http://www.mulesoft.org/schema/mule/stdio/current/mule-stdio.xsd
http://www.mulesoft.org/schema/mule/test http://www.mulesoft.org/schema/mule/test/current/mule-test.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd">
<wmq:connector channel="MAGENTO.SVRCONN" doc:name="WMQ Connector" hostName="[ip]" name="wmqConnector" port="1414" queueManager="MAGENTO" transportType="CLIENT_MQ_TCPIP" validateConnections="true" />
<vm:connector name="VM" validateConnections="true" doc:name="VM">
<vm:queue-profile maxOutstandingMessages="500">
<default-persistent-queue-store/>
</vm:queue-profile>
</vm:connector>
<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>
<flow name="Input">
<http:listener config-ref="HTTP_Listener_Configuration" path="/mq" doc:name="HTTP"/>
<set-payload value="#[message.inboundProperties.'http.query.params'.xml]" doc:name="Set Payload"/>
<logger message="Entering Request-Reply Payload: #[payload]" level="INFO" doc:name="Logger"/>
<request-reply doc:name="Request-Reply">
<vm:outbound-endpoint exchange-pattern="one-way" path="mq" connector-ref="VM" doc:name="VM_TriggerSend" />
<vm:inbound-endpoint exchange-pattern="request-response" path="mq-return" connector-ref="VM" doc:name="VM_Receive" />
</request-reply>
<logger message="Request-Reply has ended. Payload: #[payload]" level="INFO" doc:name="Logger"/>
</flow>
<flow name="Send_Message">
<vm:inbound-endpoint exchange-pattern="one-way" path="mq" connector-ref="VM" doc:name="VM_Send"/>
<logger message="(Preparing to Dispatch) Payload: #[payload]" level="INFO" doc:name="Logger"/>
<wmq:outbound-endpoint queue="PUT_QUEUE" connector-ref="wmqConnector" doc:name="WMQ"/>
<logger message="Message presumably being dispatched after this log" level="INFO" doc:name="Logger"/>
</flow>
<flow name="Receive_Message">
<wmq:inbound-endpoint queue="GET_QUEUE" connector-ref="wmqConnector" doc:name="WMQ_Receive" />
<logger message="Triggering Receive" level="INFO" doc:name="Logger"/>
<vm:outbound-endpoint exchange-pattern="request-response" path="mq-return" connector-ref="VM" doc:name="VM_TriggerReceive" />
</flow>
</mule>
几乎可以正常工作,但它会无限期挂起,而不会向 HTTP 服务器发回响应。将 Request-Reply 流中的 vm:inbound-endpoint 设置为单向可以防止它挂起,但不会发送已收到的新有效负载,而是发送先前的有效负载(就像它被跳过一样)。
如有任何帮助,我们将不胜感激!
使异步数据源同步确实很棘手。但我真的没有看到你的请求-回复策略有问题。
尝试在两端使用带有 WMQ 端点的请求-回复。然后监控队列并查看消息和回复是否按预期到达。您是否对请求和回复使用相同的队列?最简单的方案是使用不同的方案,这对您的用例可行吗?
如果我理解正确的话,您只是想公开对 queue 的请求和对众所周知的 queue 的响应作为 REST api.
那里不需要 VM queue,作为 JMS 传输的 request-reply 元素也无法模拟请求响应交换模式。
JMS queues 只是一种方式。您必须在不同的 queue 上回复。 request-response 场景背后的逻辑是 outbound-endpoint 具有出站 属性:MULE_REPLYTO
。这个 属性 应该设置为你对 queue 的众所周知的回复的名称:GET_QUEUE(如果没有提供这样的 header,将创建一个临时的 queue ).
MULE_REPLY 到 header 将成为标准 JMSReplyTo
当在服务上收到消息时。服务 必须尊重 这个 header 将回复发送回那个 queue。如果服务是在 Mule 上实现的,这将自动发生,否则它可能不会发生。你应该仔细检查它是否受到尊重。
如果你想使用一个 request-reply 作用域到 one-way 个端点而不是一个 request-reponse 端点,没关系,应该工作相同但代码更多。