如何在 Mule4 中更新 ObjectStore 中 Key 的值?
How to update the value of a Key in ObjectStore in Mule4?
我的要求是,我需要统计一天处理的消息。
所以,我创建了一个子流,我在其中创建了一个持久对象库。我最初使用默认值 0 检索它。然后,我将其传递给 java AtomicInteger class 以在请求进入时每次递增。现在我存储新值。
<sub-flow name="impl-message-counter:\counter-set-flow" doc:id="929c5fa5-74a2-4e8e-b1a0-43209b57222c" >
<os:retrieve doc:name="Retrieve" doc:id="565ecf6c-ffba-491e-b2c9-35ef4b4f52ad" key="counter" objectStore="Object_store" target="count">
<os:default-value ><![CDATA[0]]></os:default-value>
</os:retrieve>
<java:new constructor="AtomicInteger(int)" doc:name="New" doc:id="f0e75c0b-914a-44ee-9e24-a597025afd77" class="java.util.concurrent.atomic.AtomicInteger" target="atomicinteger">
<java:args ><![CDATA[#[output application/java
---
{
arg0 : vars.count as Number {class: "java.lang.Integer"}
}]]]></java:args>
</java:new>
<java:invoke doc:name="Invoke" doc:id="60779082-ad71-4770-b871-8d86044aa62a" instance="#[vars.atomicinteger]" class="java.util.concurrent.atomic.AtomicInteger" method="incrementAndGet()" target="newVal"/>
<os:store doc:name="Store" doc:id="8f85aa4f-5bce-4248-8a3b-a8cde27a1307" key="counter" objectStore="Object_store">
<os:value><![CDATA[#[vars.newVal]]]></os:value>
</os:store>
</sub-flow>
在上面的流程中,我想添加一个check processor,看看今天是不是过去了。如果它过了午夜,我想将它重新分配给 0。否则继续递增;
<choice doc:name="Choice" doc:id="120b1f05-7afe-4cfc-a103-0b26f9df7140" >
<when expression="#[now().hour==23 and now().minutes==59 and now().seconds==60]">
<os:store doc:name="Store" doc:id="3d995b22-a648-4c7f-af54-33d70b23705c" key="counter" objectStore="Object_store" >
<os:value ><![CDATA[#[0]]]></os:value>
</os:store>
</when>
<otherwise >
<os:store doc:name="Store" doc:id="8f85aa4f-5bce-4248-8a3b-a8cde27a1307" key="counter" objectStore="Object_store">
<os:value><![CDATA[#[vars.newVal]]]></os:value>
</os:store>
</otherwise>
</choice>
但是上面的方法行得通吗?我认为如果相同的对象库中存在相同的密钥,它会跳过吗?如何更新现有密钥?
这是个好主意,但您必须意识到对象存储是不可靠的。如果您编写一个简单的测试并进行负载和压力测试,您可以检查它。特别是在集群中。
首先,通常它可以工作,但后来,尤其是在同时失败的并发请求上。因此,请谨慎使用它,不要依赖它。最糟糕的部分 - 它会打断你的心流。我们能做的是——
https://simpleflatservice.com/mule4/IgnoreUnreliableObjectStorage.html - 包装对对象存储的每次调用以尝试阻止并简单地忽略任何失败或意外结果。
这是代码
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:os="http://www.mulesoft.org/schema/mule/os"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
<flow name="selftestFlow" doc:id="79a89806-adea-4017-a838-8b8c3e2002df" >
<http:listener doc:name="Listener" doc:id="63128e3f-34f5-4f04-9b8a-0bebe67f763c" config-ref="HTTP_Listener_config" path="/counter"/>
<try doc:name="Try" doc:id="d5d83aa3-b271-41b0-98a6-381823164de4" >
<os:retrieve doc:name="Retrieve" doc:id="f6d3cccd-a7be-44fa-a51d-36bddbc5892f" key="test" target="localTest">
<os:default-value><![CDATA[N/A]]></os:default-value>
</os:retrieve>
<error-handler >
<on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="b14ab2e4-c871-4af8-a488-9c70284a525f" />
</error-handler>
</try>
<ee:transform doc:name="Transform Message" doc:id="bc50c855-34b4-4661-9f42-2b18e441782d" >
<ee:message >
</ee:message>
<ee:variables >
<ee:set-variable variableName="localTest" ><![CDATA[%dw 2.0
output application/java
---
if (vars.localTest is Number) ( vars.localTest as Number + 1 ) else ( 1 as Number )]]></ee:set-variable>
</ee:variables>
</ee:transform>
<try doc:name="Try" doc:id="2d54c616-7fb6-41ba-bc05-0918199bcba4" >
<os:store doc:name="Store" doc:id="2847d9db-d2f6-4bc4-a0a5-dca0eafbdd49" key="test">
<os:value><![CDATA[#[vars.localTest]]]></os:value>
</os:store>
<error-handler >
<on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="90e6616a-af11-4af6-a82f-e9823f764ceb" />
</error-handler>
</try>
<ee:transform doc:name="Transform Message" doc:id="bdc2a164-3d80-427f-b95d-a7dfc2b69400" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{ counter: vars.localTest }]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>
</mule>
我的要求是,我需要统计一天处理的消息。 所以,我创建了一个子流,我在其中创建了一个持久对象库。我最初使用默认值 0 检索它。然后,我将其传递给 java AtomicInteger class 以在请求进入时每次递增。现在我存储新值。
<sub-flow name="impl-message-counter:\counter-set-flow" doc:id="929c5fa5-74a2-4e8e-b1a0-43209b57222c" >
<os:retrieve doc:name="Retrieve" doc:id="565ecf6c-ffba-491e-b2c9-35ef4b4f52ad" key="counter" objectStore="Object_store" target="count">
<os:default-value ><![CDATA[0]]></os:default-value>
</os:retrieve>
<java:new constructor="AtomicInteger(int)" doc:name="New" doc:id="f0e75c0b-914a-44ee-9e24-a597025afd77" class="java.util.concurrent.atomic.AtomicInteger" target="atomicinteger">
<java:args ><![CDATA[#[output application/java
---
{
arg0 : vars.count as Number {class: "java.lang.Integer"}
}]]]></java:args>
</java:new>
<java:invoke doc:name="Invoke" doc:id="60779082-ad71-4770-b871-8d86044aa62a" instance="#[vars.atomicinteger]" class="java.util.concurrent.atomic.AtomicInteger" method="incrementAndGet()" target="newVal"/>
<os:store doc:name="Store" doc:id="8f85aa4f-5bce-4248-8a3b-a8cde27a1307" key="counter" objectStore="Object_store">
<os:value><![CDATA[#[vars.newVal]]]></os:value>
</os:store>
</sub-flow>
在上面的流程中,我想添加一个check processor,看看今天是不是过去了。如果它过了午夜,我想将它重新分配给 0。否则继续递增;
<choice doc:name="Choice" doc:id="120b1f05-7afe-4cfc-a103-0b26f9df7140" >
<when expression="#[now().hour==23 and now().minutes==59 and now().seconds==60]">
<os:store doc:name="Store" doc:id="3d995b22-a648-4c7f-af54-33d70b23705c" key="counter" objectStore="Object_store" >
<os:value ><![CDATA[#[0]]]></os:value>
</os:store>
</when>
<otherwise >
<os:store doc:name="Store" doc:id="8f85aa4f-5bce-4248-8a3b-a8cde27a1307" key="counter" objectStore="Object_store">
<os:value><![CDATA[#[vars.newVal]]]></os:value>
</os:store>
</otherwise>
</choice>
但是上面的方法行得通吗?我认为如果相同的对象库中存在相同的密钥,它会跳过吗?如何更新现有密钥?
这是个好主意,但您必须意识到对象存储是不可靠的。如果您编写一个简单的测试并进行负载和压力测试,您可以检查它。特别是在集群中。
首先,通常它可以工作,但后来,尤其是在同时失败的并发请求上。因此,请谨慎使用它,不要依赖它。最糟糕的部分 - 它会打断你的心流。我们能做的是——
https://simpleflatservice.com/mule4/IgnoreUnreliableObjectStorage.html - 包装对对象存储的每次调用以尝试阻止并简单地忽略任何失败或意外结果。
这是代码
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:os="http://www.mulesoft.org/schema/mule/os"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
<flow name="selftestFlow" doc:id="79a89806-adea-4017-a838-8b8c3e2002df" >
<http:listener doc:name="Listener" doc:id="63128e3f-34f5-4f04-9b8a-0bebe67f763c" config-ref="HTTP_Listener_config" path="/counter"/>
<try doc:name="Try" doc:id="d5d83aa3-b271-41b0-98a6-381823164de4" >
<os:retrieve doc:name="Retrieve" doc:id="f6d3cccd-a7be-44fa-a51d-36bddbc5892f" key="test" target="localTest">
<os:default-value><![CDATA[N/A]]></os:default-value>
</os:retrieve>
<error-handler >
<on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="b14ab2e4-c871-4af8-a488-9c70284a525f" />
</error-handler>
</try>
<ee:transform doc:name="Transform Message" doc:id="bc50c855-34b4-4661-9f42-2b18e441782d" >
<ee:message >
</ee:message>
<ee:variables >
<ee:set-variable variableName="localTest" ><![CDATA[%dw 2.0
output application/java
---
if (vars.localTest is Number) ( vars.localTest as Number + 1 ) else ( 1 as Number )]]></ee:set-variable>
</ee:variables>
</ee:transform>
<try doc:name="Try" doc:id="2d54c616-7fb6-41ba-bc05-0918199bcba4" >
<os:store doc:name="Store" doc:id="2847d9db-d2f6-4bc4-a0a5-dca0eafbdd49" key="test">
<os:value><![CDATA[#[vars.localTest]]]></os:value>
</os:store>
<error-handler >
<on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="90e6616a-af11-4af6-a82f-e9823f764ceb" />
</error-handler>
</try>
<ee:transform doc:name="Transform Message" doc:id="bdc2a164-3d80-427f-b95d-a7dfc2b69400" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{ counter: vars.localTest }]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>
</mule>