Mule 4:流:如何检查有效负载是否是 Mule 4 中的流?
Mule 4 : Streams : How to check if payload is a Stream in Mule 4?
我是 Mule 4 的新手并且关注 understanding/doubts 关于流。我将非常感谢您的帮助。
- mule 4 组件什么时候生成流?例如,数据库 select 可以 return 对象数组和单个值。那么在这两种情况下,return 有效载荷都会流式传输吗?
- 如何检查由 mule 4 组件 return 编辑的负载是否是流,如果是,如何分析流意味着在可重复文件存储流的情况下创建的文件在哪里以及如何很多负载已经作为流消耗了?
例如,我创建了以下 Mule 4 应用程序,该应用程序读取具有 100 万条记录的 CSV 文件并执行以下操作:
- 读取 CSV 文件[流媒体策略:可重复文件存储流,内存大小:512 KB]
- 对批量大小为 10k 的每个循环使用 a
- 在每个内部,一个将 csv 行转换为 Json 的转换消息和一个文件写入操作,该操作将根据 dw 代码创建名称为的文件:
p('destination.dir') ++ "Output_" ++ vars.counter ++ ".txt"
生成了 100 个文件,每个文件包含 10k 条记录。选中后,每个文件的大小为 913 KB。
预期:对于每个将处理 n 条记录,其中 n 条记录的大小为 512 KB,并在下一次迭代中处理 (batchsize-n)
实际:对于每批处理的每 10000 条记录。
它怎么发生的?
Mule 流代码:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
<file:config name="File_Config_1" doc:name="File Config" doc:id="565a1655-beba-4048-942f-ae68887e9b96" />
<file:config name="File_Config" doc:name="File Config" doc:id="5182b61a-dddf-41c1-8d3c-6dbd895b5db7" />
<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="f5bdb2ca-dc6f-4b6b-8407-8543bf7522e2" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<flow name="dw-streamingFlow">
<http:listener doc:name="Listener" doc:id="78635a2a-959d-41e9-a294-3cc7bb22d36f" config-ref="HTTP_Listener_config" path="/app/fileStream"/>
<file:read path="C:\Users\bbazazx\Documents\TestFolder\InputDirectory\input.csv"
config-ref="File_Config"
outputMimeType="application/csv; streaming=true; header=true" />
<foreach doc:name="For Each" doc:id="eacd49dc-c49f-437a-927e-976add7e57fc" batchSize="500" collection="payload">
<ee:transform doc:name="Transform Message" doc:id="bec02dd1-4c15-47c0-81cf-a7d0bfd64b39">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
payload map(item,index) -> {
"Country" : item."Country",
"FoodItems" : item."Item Type"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<file:write doc:name="Write" config-ref="File_Config_1" path='#["C:\Users\bbazazx\Documents\TestFolder\OutputDirectory\output" ++ vars.counter ++ ".json"]' />
<logger level="INFO" doc:name="Logger" message="#[payload]" />
</foreach>
</flow>
</mule>
这取决于每个连接器。 Mule 4 中的大多数连接器都允许流式传输,并且许多连接器允许配置流式传输策略。有关详细信息,请参阅 https://docs.mulesoft.com/mule-runtime/4.3/streaming-about。
可能是数据库连接器 returns 一个包含在流中的数组。 Mule 会理解它并将其作为一个数组透明地处理。
您可以在调试器中查看负载的 class 是一种流、可迭代还是托管游标。这似乎是流媒体的迹象。
提到的512 KB是缓冲区的大小。对于文件存储的可重复流策略,上面的文档链接对此进行了解释:
This strategy initially uses an in-memory buffer size of 512 KB. For
larger streams, the strategy creates a temporary file to the disk to
store the contents, without overflowing your memory.
我是 Mule 4 的新手并且关注 understanding/doubts 关于流。我将非常感谢您的帮助。
- mule 4 组件什么时候生成流?例如,数据库 select 可以 return 对象数组和单个值。那么在这两种情况下,return 有效载荷都会流式传输吗?
- 如何检查由 mule 4 组件 return 编辑的负载是否是流,如果是,如何分析流意味着在可重复文件存储流的情况下创建的文件在哪里以及如何很多负载已经作为流消耗了?
例如,我创建了以下 Mule 4 应用程序,该应用程序读取具有 100 万条记录的 CSV 文件并执行以下操作:
- 读取 CSV 文件[流媒体策略:可重复文件存储流,内存大小:512 KB]
- 对批量大小为 10k 的每个循环使用 a
- 在每个内部,一个将 csv 行转换为 Json 的转换消息和一个文件写入操作,该操作将根据 dw 代码创建名称为的文件:
p('destination.dir') ++ "Output_" ++ vars.counter ++ ".txt"
生成了 100 个文件,每个文件包含 10k 条记录。选中后,每个文件的大小为 913 KB。
预期:对于每个将处理 n 条记录,其中 n 条记录的大小为 512 KB,并在下一次迭代中处理 (batchsize-n) 实际:对于每批处理的每 10000 条记录。 它怎么发生的?
Mule 流代码:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
<file:config name="File_Config_1" doc:name="File Config" doc:id="565a1655-beba-4048-942f-ae68887e9b96" />
<file:config name="File_Config" doc:name="File Config" doc:id="5182b61a-dddf-41c1-8d3c-6dbd895b5db7" />
<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="f5bdb2ca-dc6f-4b6b-8407-8543bf7522e2" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<flow name="dw-streamingFlow">
<http:listener doc:name="Listener" doc:id="78635a2a-959d-41e9-a294-3cc7bb22d36f" config-ref="HTTP_Listener_config" path="/app/fileStream"/>
<file:read path="C:\Users\bbazazx\Documents\TestFolder\InputDirectory\input.csv"
config-ref="File_Config"
outputMimeType="application/csv; streaming=true; header=true" />
<foreach doc:name="For Each" doc:id="eacd49dc-c49f-437a-927e-976add7e57fc" batchSize="500" collection="payload">
<ee:transform doc:name="Transform Message" doc:id="bec02dd1-4c15-47c0-81cf-a7d0bfd64b39">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
payload map(item,index) -> {
"Country" : item."Country",
"FoodItems" : item."Item Type"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<file:write doc:name="Write" config-ref="File_Config_1" path='#["C:\Users\bbazazx\Documents\TestFolder\OutputDirectory\output" ++ vars.counter ++ ".json"]' />
<logger level="INFO" doc:name="Logger" message="#[payload]" />
</foreach>
</flow>
</mule>
这取决于每个连接器。 Mule 4 中的大多数连接器都允许流式传输,并且许多连接器允许配置流式传输策略。有关详细信息,请参阅 https://docs.mulesoft.com/mule-runtime/4.3/streaming-about。
可能是数据库连接器 returns 一个包含在流中的数组。 Mule 会理解它并将其作为一个数组透明地处理。
您可以在调试器中查看负载的 class 是一种流、可迭代还是托管游标。这似乎是流媒体的迹象。
提到的512 KB是缓冲区的大小。对于文件存储的可重复流策略,上面的文档链接对此进行了解释:
This strategy initially uses an in-memory buffer size of 512 KB. For larger streams, the strategy creates a temporary file to the disk to store the contents, without overflowing your memory.