Mule 中同步 AMQP 连接器的问题
Problems with sync AMQP connector in Mule
我正在尝试以持久的方式处理 Mule 中的 XML 个对象队列,这些对象已从原始 xml 文件中分离出来,然后使用 "choice" 组件。
选择组件的每一段都通向具有不同队列的 AMQP 端点。每个队列的另一端是另一个 Mule 流,它应该读取队列,对 XML 和 return 做一些事情作为回复。所有 AMQP 端点都设置为请求-响应。
在将某些内容放入 AMQP 队列之前,流程似乎正常工作,但随后立即继续,而不是等待消息。
这也是远程流程的结果,它似乎正确读取了队列,但似乎在继续正确处理之前立即回复。最后它应该回复消息,但似乎没有回复。
这里有一些代码片段,以防有人指出我哪里出错了...
主要路由器流程
<amqp:connector name="connector.amqp.mule.default" doc:name="AMQP Connector" validateConnections="true"/>
<flow name="routerFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/configtest" doc:name="HTTP" allowedMethods="POST" />
<mulexml:dom-to-xml-transformer doc:name="DOM to XML"/>
<splitter expression="#[xpath3('//cfg:Configuration', message.payload, 'NODESET')]" doc:name="Splitter" enableCorrelation="ALWAYS"/>
<mulexml:dom-to-xml-transformer doc:name="DOM to XML"/>
<set-variable variableName="firstElement" value="#[xpath3('name(/*/*[1])', message.payload, 'STRING')]" doc:name="GetConfigItem" />
<choice doc:name="Route by Config Item Type">
<when expression="#[flowVars['firstElement'] == 'dir:DirectoryObject']">
<amqp:outbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationDirectoryObject" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="DirectoryObject Queue" connector-ref="connector.amqp.mule.default"/>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<logger message=""Back from DirectoryQueue with " + #[payload]" level="INFO" doc:name="Logger"/>
</when>
<when expression="#[flowVars['firstElement'] == 'gpo:GroupPolicyObject']">
<amqp:outbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationGroupPolicy" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="GroupPolicy Queue" connector-ref="connector.amqp.mule.default"/>
</when>
</choice>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<collection-aggregator timeout="60000" failOnTimeout="true" doc:name="Collection Aggregator"/>
目录队列流
<flow name="directoryobjectFlow">
<amqp:inbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationDirectoryObject" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP" connector-ref="connector.amqp.mule.default"/>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<http:request config-ref="HTTP_Request_Configuration" path="/xml" method="POST" responseTimeout="60000" doc:name="HTTP">
<http:request-builder>
<http:header headerName="Content-Type" value="application/xml"/>
<http:header headerName="Accept" value="application/xml"/>
</http:request-builder>
</http:request>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<logger level="INFO" doc:name="Logger" message="Exit DirectoryFlow with #[payload]"/>
</flow>
和组策略队列(当前设置为不执行任何操作)
<flow name="amiab-esb-grouppolicyFlow">
<amqp:inbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationGroupPolicy" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP" connector-ref="connector.amqp.mule.default"/>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<set-payload doc:name="Set Payload" value="Nothing" />
<logger level="INFO" doc:name="Logger" message="Exit GroupPolicy with #[payload]"/>
</flow>
我是 Mule 的新手,刚刚开始掌握它,所以如果有任何想法或见解,我将不胜感激。
谢谢!
请使用今天发布的最新版连接器3.6.2。这将毫无问题地执行带有请求-响应端点的流程。
我正在尝试以持久的方式处理 Mule 中的 XML 个对象队列,这些对象已从原始 xml 文件中分离出来,然后使用 "choice" 组件。 选择组件的每一段都通向具有不同队列的 AMQP 端点。每个队列的另一端是另一个 Mule 流,它应该读取队列,对 XML 和 return 做一些事情作为回复。所有 AMQP 端点都设置为请求-响应。
在将某些内容放入 AMQP 队列之前,流程似乎正常工作,但随后立即继续,而不是等待消息。
这也是远程流程的结果,它似乎正确读取了队列,但似乎在继续正确处理之前立即回复。最后它应该回复消息,但似乎没有回复。
这里有一些代码片段,以防有人指出我哪里出错了...
主要路由器流程
<amqp:connector name="connector.amqp.mule.default" doc:name="AMQP Connector" validateConnections="true"/>
<flow name="routerFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/configtest" doc:name="HTTP" allowedMethods="POST" />
<mulexml:dom-to-xml-transformer doc:name="DOM to XML"/>
<splitter expression="#[xpath3('//cfg:Configuration', message.payload, 'NODESET')]" doc:name="Splitter" enableCorrelation="ALWAYS"/>
<mulexml:dom-to-xml-transformer doc:name="DOM to XML"/>
<set-variable variableName="firstElement" value="#[xpath3('name(/*/*[1])', message.payload, 'STRING')]" doc:name="GetConfigItem" />
<choice doc:name="Route by Config Item Type">
<when expression="#[flowVars['firstElement'] == 'dir:DirectoryObject']">
<amqp:outbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationDirectoryObject" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="DirectoryObject Queue" connector-ref="connector.amqp.mule.default"/>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<logger message=""Back from DirectoryQueue with " + #[payload]" level="INFO" doc:name="Logger"/>
</when>
<when expression="#[flowVars['firstElement'] == 'gpo:GroupPolicyObject']">
<amqp:outbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationGroupPolicy" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="GroupPolicy Queue" connector-ref="connector.amqp.mule.default"/>
</when>
</choice>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<collection-aggregator timeout="60000" failOnTimeout="true" doc:name="Collection Aggregator"/>
目录队列流
<flow name="directoryobjectFlow">
<amqp:inbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationDirectoryObject" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP" connector-ref="connector.amqp.mule.default"/>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<http:request config-ref="HTTP_Request_Configuration" path="/xml" method="POST" responseTimeout="60000" doc:name="HTTP">
<http:request-builder>
<http:header headerName="Content-Type" value="application/xml"/>
<http:header headerName="Accept" value="application/xml"/>
</http:request-builder>
</http:request>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<logger level="INFO" doc:name="Logger" message="Exit DirectoryFlow with #[payload]"/>
</flow>
和组策略队列(当前设置为不执行任何操作)
<flow name="amiab-esb-grouppolicyFlow">
<amqp:inbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationGroupPolicy" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP" connector-ref="connector.amqp.mule.default"/>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<set-payload doc:name="Set Payload" value="Nothing" />
<logger level="INFO" doc:name="Logger" message="Exit GroupPolicy with #[payload]"/>
</flow>
我是 Mule 的新手,刚刚开始掌握它,所以如果有任何想法或见解,我将不胜感激。
谢谢!
请使用今天发布的最新版连接器3.6.2。这将毫无问题地执行带有请求-响应端点的流程。