Mule Request-Reply Scope 无法读取数据库请求的响应
Mule Request-Reply Scope failing to read response with database request
我一直在尝试构建一个有效地使用数据库作为队列的流程。这样做的原因是其他进程应该阅读和回复这条消息,并且是这样设计的。不幸的是,我无法控制其他进程,也无法使其响应不同的排队系统。
所以流程会像这样工作:将记录插入数据库的 HTTP 请求 -> 单独的应用程序(在 mule 之外)轮询此数据库 table 以获取消息并响应另一个将消息发送到不同的 table(此步骤可能需要 >5 秒才能响应)-> 读取此新行并响应原始 http 请求。
在这种设计中,请求-回复范围总是在等待回复出现时超时。 (我手动将其设置为 20 秒以快速显示)
Response timed out (20000ms) waiting for message response id "3e1a7750-ee13-11e6-ae40-0c9920524153" or this action was interrupted. Failed to route event via endpoint: null. Message payload is of type: Integer
我显然遗漏了一些东西,似乎无法从 mule 中找到正确的文档。我希望本网站的好用户之一可以纠正我的方式错误。
以下是流程和视图示例
<flow name="mainFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="hello" doc:name="HTTP"/>
<cxf:jaxws-service doc:name="CXF" configuration-ref="CXF_Configuration" serviceClass="kansas.MuleTestServiceImpl"/>
<request-reply doc:name="Request-Reply" timeout="20000">
<db:insert config-ref="Oracle_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[insert into tblRequest (id, correlationId, replyTo) values (#[message.id], #[message.correlationId], #[message.replyTo])]]></db:parameterized-query>
</db:insert>
<jms:inbound-endpoint queue="test.response" connector-ref="syncJms" doc:name="JMS">
<jms:transaction action="JOIN_IF_POSSIBLE"/>
</jms:inbound-endpoint>
</request-reply>
<logger message="payload is #[payload]" level="INFO" doc:name="Logger"/>
</flow>
<flow name="databasePoller">
<poll doc:name="Poll">
<fixed-frequency-scheduler frequency="5000"/>
<db:select config-ref="Oracle_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[select id,correlationId,msgresponse,replyto from tblResponse]]></db:parameterized-query>
</db:select>
</poll>
<foreach collection="#[payload]" doc:name="For Each">
<set-variable variableName="storedPayload" value="#[payload]" doc:name="storePayload"/>
<db:delete config-ref="Oracle_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[delete from tblResponse where correlationId = #[storedPayload.correlationId]]]></db:parameterized-query>
</db:delete>
<set-payload value="#[flowVars.storedPayload]" doc:name="restorePayload"/>
<message-properties-transformer overwrite="true" doc:name="Message Properties">
<add-message-property key="MULE_CORRELATION_ID" value="#[payload.ID]"/>
<add-message-property key="MULE_REPLYTO" value="#[payload.REPLYTO]"/>
</message-properties-transformer>
<set-payload value="#[payload.MSGRESPONSE]" doc:name="Set Payload"/>
<jms:outbound-endpoint queue="test.response" connector-ref="syncJms" doc:name="JMS"/>
<logger level="INFO" doc:name="Logger"/>
</foreach>
</flow>
以下异常
消息:响应超时(20000 毫秒)等待消息响应 ID "b9a93d10-efa4-11e6-808b-0c9920524153" 或此操作被中断。无法通过端点路由事件:null。消息负载的类型为:整数
类型:org.mule.api.routing.ResponseTimeoutException
代码:MULE_ERROR--2
Java文档:http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/routing/ResponseTimeoutException.html
有效载荷:1
根异常堆栈跟踪:
org.mule.api.routing.ResponseTimeoutException: 响应超时 (20000ms) 等待消息响应 id "b9a93d10-efa4-11e6-808b-0c9920524153" 或此操作被中断。无法通过端点路由事件:null。消息负载的类型为:整数
在 org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.receiveAsyncReply(AbstractAsyncRequestReplyRequester.java:283)
在 org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.process(AbstractAsyncRequestReplyRequester.java:89)
我稍微简化了流程以使其正常运行。如果您不是在单独的流程中轮询数据库,而是使用 VM 队列触发它,这可能是一个合适的解决方案。
<flow name="mainFlow">
<http:listener config-ref="HTTP_Listener_Configuration"
path="hello" doc:name="HTTP" />
<dw:transform-message doc:name="Transform Message">
<dw:set-payload>
<![CDATA[%dw 1.0 %output application/java
---
{
name: "abc",
euro: 130,
usd: 123
}]]></dw:set-payload>
</dw:transform-message>
<request-reply doc:name="Request-Reply" timeout="20000">
<vm:outbound-endpoint exchange-pattern="one-way" path="db" doc:name="VM">
<message-properties-transformer scope="outbound"> <delete-message-property key="MULE_REPLYTO"/> </message-properties-transformer>
</vm:outbound-endpoint>
<vm:inbound-endpoint exchange-pattern="one-way" path="test.response" doc:name="VM"/>
</request-reply>
<logger message="#[message.payloadAs(java.lang.String)]" level="INFO" doc:name="Logger"/>
</flow>
<flow name="soFlow">
<vm:inbound-endpoint exchange-pattern="one-way" path="db" doc:name="VM"/>
<db:insert config-ref="MySQL_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[insert into item (
item_name,
price_euro,
price_usd)
values (#[payload.name], #[payload.euro], #[payload.usd])]]> </db:parameterized-query>
</db:insert>
<vm:outbound-endpoint exchange-pattern="one-way" path="call-db-query" doc:name="VM"/>
</flow>
<flow name="databasePoller">
<vm:inbound-endpoint exchange-pattern="one-way" path="call-db-query" doc:name="VM"/>
<db:select config-ref="MySQL_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[select item_name,
price_euro,
price_usd from item]]></db:parameterized-query>
</db:select>
<vm:outbound-endpoint exchange-pattern="one-way" path="test.response" doc:name="VM"/>
</flow>
我相信我已经解决了这个问题。
我完全使用 HTTP 请求端点而不是数据库轮询器来完成这项工作。查看消息时,似乎有一个不同之处是通过我的数据库轮询器,mule 放置了 2 个额外的属性。 MULE_CORRELATION_SEQUENCE 和 MULE_CORRELATION_GROUP_SIZE.
通过在将消息发送到 jms 之前删除这些属性,允许请求-回复范围正确识别 jms 队列中的响应。
<remove-property propertyName="MULE_CORRELATION_SEQUENCE" doc:name="Property"/>
<remove-property propertyName="MULE_CORRELATION_GROUP_SIZE" doc:name="Property"/>
我一直在尝试构建一个有效地使用数据库作为队列的流程。这样做的原因是其他进程应该阅读和回复这条消息,并且是这样设计的。不幸的是,我无法控制其他进程,也无法使其响应不同的排队系统。
所以流程会像这样工作:将记录插入数据库的 HTTP 请求 -> 单独的应用程序(在 mule 之外)轮询此数据库 table 以获取消息并响应另一个将消息发送到不同的 table(此步骤可能需要 >5 秒才能响应)-> 读取此新行并响应原始 http 请求。
在这种设计中,请求-回复范围总是在等待回复出现时超时。 (我手动将其设置为 20 秒以快速显示)
Response timed out (20000ms) waiting for message response id "3e1a7750-ee13-11e6-ae40-0c9920524153" or this action was interrupted. Failed to route event via endpoint: null. Message payload is of type: Integer
我显然遗漏了一些东西,似乎无法从 mule 中找到正确的文档。我希望本网站的好用户之一可以纠正我的方式错误。
以下是流程和视图示例
<flow name="mainFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="hello" doc:name="HTTP"/>
<cxf:jaxws-service doc:name="CXF" configuration-ref="CXF_Configuration" serviceClass="kansas.MuleTestServiceImpl"/>
<request-reply doc:name="Request-Reply" timeout="20000">
<db:insert config-ref="Oracle_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[insert into tblRequest (id, correlationId, replyTo) values (#[message.id], #[message.correlationId], #[message.replyTo])]]></db:parameterized-query>
</db:insert>
<jms:inbound-endpoint queue="test.response" connector-ref="syncJms" doc:name="JMS">
<jms:transaction action="JOIN_IF_POSSIBLE"/>
</jms:inbound-endpoint>
</request-reply>
<logger message="payload is #[payload]" level="INFO" doc:name="Logger"/>
</flow>
<flow name="databasePoller">
<poll doc:name="Poll">
<fixed-frequency-scheduler frequency="5000"/>
<db:select config-ref="Oracle_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[select id,correlationId,msgresponse,replyto from tblResponse]]></db:parameterized-query>
</db:select>
</poll>
<foreach collection="#[payload]" doc:name="For Each">
<set-variable variableName="storedPayload" value="#[payload]" doc:name="storePayload"/>
<db:delete config-ref="Oracle_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[delete from tblResponse where correlationId = #[storedPayload.correlationId]]]></db:parameterized-query>
</db:delete>
<set-payload value="#[flowVars.storedPayload]" doc:name="restorePayload"/>
<message-properties-transformer overwrite="true" doc:name="Message Properties">
<add-message-property key="MULE_CORRELATION_ID" value="#[payload.ID]"/>
<add-message-property key="MULE_REPLYTO" value="#[payload.REPLYTO]"/>
</message-properties-transformer>
<set-payload value="#[payload.MSGRESPONSE]" doc:name="Set Payload"/>
<jms:outbound-endpoint queue="test.response" connector-ref="syncJms" doc:name="JMS"/>
<logger level="INFO" doc:name="Logger"/>
</foreach>
</flow>
以下异常
消息:响应超时(20000 毫秒)等待消息响应 ID "b9a93d10-efa4-11e6-808b-0c9920524153" 或此操作被中断。无法通过端点路由事件:null。消息负载的类型为:整数 类型:org.mule.api.routing.ResponseTimeoutException 代码:MULE_ERROR--2 Java文档:http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/routing/ResponseTimeoutException.html 有效载荷:1
根异常堆栈跟踪: org.mule.api.routing.ResponseTimeoutException: 响应超时 (20000ms) 等待消息响应 id "b9a93d10-efa4-11e6-808b-0c9920524153" 或此操作被中断。无法通过端点路由事件:null。消息负载的类型为:整数 在 org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.receiveAsyncReply(AbstractAsyncRequestReplyRequester.java:283) 在 org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.process(AbstractAsyncRequestReplyRequester.java:89)
我稍微简化了流程以使其正常运行。如果您不是在单独的流程中轮询数据库,而是使用 VM 队列触发它,这可能是一个合适的解决方案。
<flow name="mainFlow">
<http:listener config-ref="HTTP_Listener_Configuration"
path="hello" doc:name="HTTP" />
<dw:transform-message doc:name="Transform Message">
<dw:set-payload>
<![CDATA[%dw 1.0 %output application/java
---
{
name: "abc",
euro: 130,
usd: 123
}]]></dw:set-payload>
</dw:transform-message>
<request-reply doc:name="Request-Reply" timeout="20000">
<vm:outbound-endpoint exchange-pattern="one-way" path="db" doc:name="VM">
<message-properties-transformer scope="outbound"> <delete-message-property key="MULE_REPLYTO"/> </message-properties-transformer>
</vm:outbound-endpoint>
<vm:inbound-endpoint exchange-pattern="one-way" path="test.response" doc:name="VM"/>
</request-reply>
<logger message="#[message.payloadAs(java.lang.String)]" level="INFO" doc:name="Logger"/>
</flow>
<flow name="soFlow">
<vm:inbound-endpoint exchange-pattern="one-way" path="db" doc:name="VM"/>
<db:insert config-ref="MySQL_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[insert into item (
item_name,
price_euro,
price_usd)
values (#[payload.name], #[payload.euro], #[payload.usd])]]> </db:parameterized-query>
</db:insert>
<vm:outbound-endpoint exchange-pattern="one-way" path="call-db-query" doc:name="VM"/>
</flow>
<flow name="databasePoller">
<vm:inbound-endpoint exchange-pattern="one-way" path="call-db-query" doc:name="VM"/>
<db:select config-ref="MySQL_Configuration" doc:name="Database">
<db:parameterized-query><![CDATA[select item_name,
price_euro,
price_usd from item]]></db:parameterized-query>
</db:select>
<vm:outbound-endpoint exchange-pattern="one-way" path="test.response" doc:name="VM"/>
</flow>
我相信我已经解决了这个问题。 我完全使用 HTTP 请求端点而不是数据库轮询器来完成这项工作。查看消息时,似乎有一个不同之处是通过我的数据库轮询器,mule 放置了 2 个额外的属性。 MULE_CORRELATION_SEQUENCE 和 MULE_CORRELATION_GROUP_SIZE.
通过在将消息发送到 jms 之前删除这些属性,允许请求-回复范围正确识别 jms 队列中的响应。
<remove-property propertyName="MULE_CORRELATION_SEQUENCE" doc:name="Property"/>
<remove-property propertyName="MULE_CORRELATION_GROUP_SIZE" doc:name="Property"/>