内存问题:解析大型 csv 文件转换为 json,然后使用 Mule 4.4 社区版调用第三方休息服务
Memory issues : parse a large csv file transform to json and then call a third party rest service using Mule 4.4 community edition
使用 Mule 4.4 社区版 - 内部部署
我有一个包含员工列表的大型 csv 文件:
101,John Saunders,19,M,Physics,Chemistry,Mechanics
102,Jim White,17,M,Languages,Art,Pottery
...
...
文件大小为700MB
此文件将被放置在一个 SFTP 位置,供 Mule 4.4 社区版读取。
注意 - 我需要解析数据并将其转换为 JSON 然后调用第三方休息服务
如果学生在任何科目中有 'Physics',我只需要获取学生详细信息
所以转换后的 JSON 看起来像这样:
[{"id":"101","name": "John Saunders","age": "19" , "gender": "M"},{...}]
现在我面临的挑战是 - 在读取文件后,我正在检查以确保文件不为空或为空,方法是:
sizeOf(payload) == 0
文件大小大于 1 MB 的代码本身在 Anypoint Studio 中失败并出现以下错误:
Internal execution exception while executing the script, this is most probably a bug, file an issue with the script
and the input data.
NOTE: Enable dump option using -Dcom.mulesoft.dw.dump_files=true
to log all required information.
Caused by:
org.mule.runtime.api.streaming.exception.StreamingBufferSizeExceededException: Buffer has exceeded its
maximum size of 1048576
evaluating expression: "sizeOf(payload) == 0
我可以尝试增加可用的堆内存(虽然目前我还不知道具体怎么做)但我的问题是:
我可以想到以下选项:
#1 使用批处理(不是一个选项,因为批处理在 mule 的社区版中不可用)
#2 增加堆内存(不想这样做,因为文件大小会进一步增加,此解决方案将无法扩展)
#3 将大文件分解成较小的文件然后处理
#4 想要学习和探索 mule 4 中流式传输/分块的任何选项
选项 #4 是我想要探索的。
在互联网上搜索,找到一篇文章 here
所以从 sftp 读取文件时 - 我不能使用 'Repeatable file-store stream' 但可以使用 'Repeatable in memory stream'
<sftp:read doc:name="Read Empfile" config-ref="SFTP_Config" path="\working\emp.csv" timeBetweenSizeCheck="-1" outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
<repeatable-in-memory-stream />
</sftp:read>
这不会将整个文件读入内存吗?或者它只会读取文件的一部分?
这是我正在尝试的完整代码:
<flow name="load_emp" doc:id="81a7b13e-1d38-4722-9015-dd03e765023c" >
<sftp:read doc:name="Read emp file" doc:id="b8662f19-f442-4150-b74d-8339fedee72b" config-ref="SFTP_Config" path="\working\emp.unl" timeBetweenSizeCheck="-1" outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
<repeatable-in-memory-stream />
</sftp:read>
<try doc:name="Try" doc:id="2a0c2d4c-23db-4d49-aa36-634720bb45b8" >
<choice doc:name="Choice" doc:id="e2b67582-f327-4974-9fb4-875af7962e6e" >
<when expression="sizeOf(payload) == 0">
<raise-error doc:name="Raise error" doc:id="290c67ca-4ae6-47f5-8d63-bd54cf122604" type="ANY" description="File is empty"/>
</when>
<otherwise >
<logger level="INFO" doc:name="not empty" doc:id="a68045a7-ba59-4c7a-915e-2145de5c3b4b" />
</otherwise>
</choice>
</try>
<set-payload value='#[output application/json
---
payload map (value,index)->{
"id": value.column_0,
"name": value.column_1
}]' doc:name="Set Payload" doc:id="17277532-f496-4a61-8d9f-136deaeb8104" />
<http:request method="POST" doc:name="Request" doc:id="3d1982e2-9740-4866-a08e-a29b4e1596c0" config-ref="HTTP_Request_configuration1" path="/load"/>
</flow>
这是一个直观表示的示意图:
所以:
我很困惑流媒体在这个流程中实际上是如何工作的?
我是否需要根据链接文章使用 for each ?
所以假设我在文件中有 100,000 条记录,我在 for 循环中说批量大小为 1000,这是否意味着只有 1000 条记录被加载到内存中并发送到 REST API?
如果是,这是否意味着我将不得不多次调用休息端点? (100 次?)
我只需要一个简单的说明,说明要走的路以及行为是什么?
我已经阅读了很多关于流媒体的文章,但一分钱没有掉!
如果 sizeOf() 无效,请尝试使用 isEmpty() 函数。后者将尝试计算所有行。它超过了 Mule 用于流式传输的缓冲区大小。使用 isEmpty () 也应该更高效。
增加堆大小无济于事,因为该错误是因为超出了流缓冲区大小。我不记得你是否可以在社区版中增加缓冲区大小,但你必须先尝试 isEmpty。
您可以在 SFTP 读取操作后使用 attributes.size
来访问 SFTP 服务器报告的文件大小,避免将整个文件加载到内存中只是为了测量它有多大。
为了探索 Mule 4 中 streaming/chunking 的选项,您确定了一些关键问题。要在给定任意输入文件大小的情况下控制内存占用,我们必须使用以下方法:
- 可重复文件存储流在社区版中不可用
- 您可以启用 streaming in dataweave since you have CSV input。它需要进行一些配置更改,并且您会在运行时丢失转换中的错误检查。
- 您还可以使用带有 For Each 的流式输入来分块输入数据并发送多个 HTTP 请求,而无需同时将所有输入存储在堆上。
如前一个答案所述,您应该使用 attributes.size
来获取文件大小,而不是使用 sizeOf()
重新计算它。这将使您的流程至少通过选择路由器,但是每当您开始 map
您的 csv 到 set-payload
中的 json 时它都会失败。使用您共享的当前配置,增加头部大小将无济于事,您将不得不增加 repeatable-in-memory-stream
的 Max Buffer Size
参数。但还有另一种选择。
您需要先选择正确的流媒体策略。
non-repeatable-stream
如果您要将数据发送到的服务器可以处理分块请求,这可能是您的最佳选择,因为您不想增加您的应用程序的内存,并正在寻找一种以块的形式读取数据的解决方案。 除非您这样做,否则不会将数据加载到内存中。它基本上会使流式有效负载更像 java 的传统 InputStream
,因为 您只能使用一次有效负载。 因此,在继续之前这你需要确保你不需要多次有效载荷。这意味着,如果您使用有效载荷记录这样的行数 sizeOf(payload)
,它将消耗有效载荷,之后您将拥有空的有效载荷。
您将需要进行以下更改。
- Select 流模式为
non-repeatable-stream
.
- 更改选择以使用
attributes.size == 0
而不是 sizeOf
- 更新地图以生成
deferred
输出。如果您以前没有听过这个词,请不要对它感到困惑。它基本上是流式传输输出。在使用它之前请参考这个 documentation。如果你不使用它,它会再次加载内存中这个映射函数的输出。
output application/json deferred=true
---
payload map (value,index)->{
"id": value.column_0,
"name": value.column_1
}
有了这个,即使您将堆限制为 512 MB(您不应该,但可以),您也可以处理这个 700MB 的文件。
注意:此后 HTTP 请求组件将发送 chunked
数据。您可能还想确认您将 JSON 发送到的服务器是否能够正确处理它。
repeatable-in-memory-stream
:如果你使用这个,AFAIK,你将不得不像这样增加 Max Buffer Size
参数,因为你的有效负载非常大。
目前您已将其设置为 1Mb,因此您无法使用 1Mb 数据。您将不得不试验哪些值最适合您。 此流式传输策略需要在您使用有效载荷时将完整的有效载荷加载到内存中(以使其可重复)。如果必须将 Max Buffer Size
增加很多,也可能必须增加堆大小。您还可以使用此参数增加最大内存:-Dmule.max.streaming.memory=1024000000
使用 Mule 4.4 社区版 - 内部部署 我有一个包含员工列表的大型 csv 文件:
101,John Saunders,19,M,Physics,Chemistry,Mechanics
102,Jim White,17,M,Languages,Art,Pottery
...
...
文件大小为700MB
此文件将被放置在一个 SFTP 位置,供 Mule 4.4 社区版读取。 注意 - 我需要解析数据并将其转换为 JSON 然后调用第三方休息服务 如果学生在任何科目中有 'Physics',我只需要获取学生详细信息 所以转换后的 JSON 看起来像这样:
[{"id":"101","name": "John Saunders","age": "19" , "gender": "M"},{...}]
现在我面临的挑战是 - 在读取文件后,我正在检查以确保文件不为空或为空,方法是:
sizeOf(payload) == 0
文件大小大于 1 MB 的代码本身在 Anypoint Studio 中失败并出现以下错误:
Internal execution exception while executing the script, this is most probably a bug, file an issue with the script and the input data. NOTE: Enable dump option using
-Dcom.mulesoft.dw.dump_files=true
to log all required information. Caused by: org.mule.runtime.api.streaming.exception.StreamingBufferSizeExceededException: Buffer has exceeded its maximum size of 1048576 evaluating expression: "sizeOf(payload) == 0
我可以尝试增加可用的堆内存(虽然目前我还不知道具体怎么做)但我的问题是: 我可以想到以下选项:
#1 使用批处理(不是一个选项,因为批处理在 mule 的社区版中不可用)
#2 增加堆内存(不想这样做,因为文件大小会进一步增加,此解决方案将无法扩展)
#3 将大文件分解成较小的文件然后处理
#4 想要学习和探索 mule 4 中流式传输/分块的任何选项
选项 #4 是我想要探索的。 在互联网上搜索,找到一篇文章 here
所以从 sftp 读取文件时 - 我不能使用 'Repeatable file-store stream' 但可以使用 'Repeatable in memory stream'
<sftp:read doc:name="Read Empfile" config-ref="SFTP_Config" path="\working\emp.csv" timeBetweenSizeCheck="-1" outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
<repeatable-in-memory-stream />
</sftp:read>
这不会将整个文件读入内存吗?或者它只会读取文件的一部分?
这是我正在尝试的完整代码:
<flow name="load_emp" doc:id="81a7b13e-1d38-4722-9015-dd03e765023c" >
<sftp:read doc:name="Read emp file" doc:id="b8662f19-f442-4150-b74d-8339fedee72b" config-ref="SFTP_Config" path="\working\emp.unl" timeBetweenSizeCheck="-1" outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
<repeatable-in-memory-stream />
</sftp:read>
<try doc:name="Try" doc:id="2a0c2d4c-23db-4d49-aa36-634720bb45b8" >
<choice doc:name="Choice" doc:id="e2b67582-f327-4974-9fb4-875af7962e6e" >
<when expression="sizeOf(payload) == 0">
<raise-error doc:name="Raise error" doc:id="290c67ca-4ae6-47f5-8d63-bd54cf122604" type="ANY" description="File is empty"/>
</when>
<otherwise >
<logger level="INFO" doc:name="not empty" doc:id="a68045a7-ba59-4c7a-915e-2145de5c3b4b" />
</otherwise>
</choice>
</try>
<set-payload value='#[output application/json
---
payload map (value,index)->{
"id": value.column_0,
"name": value.column_1
}]' doc:name="Set Payload" doc:id="17277532-f496-4a61-8d9f-136deaeb8104" />
<http:request method="POST" doc:name="Request" doc:id="3d1982e2-9740-4866-a08e-a29b4e1596c0" config-ref="HTTP_Request_configuration1" path="/load"/>
</flow>
这是一个直观表示的示意图:
所以: 我很困惑流媒体在这个流程中实际上是如何工作的? 我是否需要根据链接文章使用 for each ? 所以假设我在文件中有 100,000 条记录,我在 for 循环中说批量大小为 1000,这是否意味着只有 1000 条记录被加载到内存中并发送到 REST API?
如果是,这是否意味着我将不得不多次调用休息端点? (100 次?)
我只需要一个简单的说明,说明要走的路以及行为是什么? 我已经阅读了很多关于流媒体的文章,但一分钱没有掉!
如果 sizeOf() 无效,请尝试使用 isEmpty() 函数。后者将尝试计算所有行。它超过了 Mule 用于流式传输的缓冲区大小。使用 isEmpty () 也应该更高效。
增加堆大小无济于事,因为该错误是因为超出了流缓冲区大小。我不记得你是否可以在社区版中增加缓冲区大小,但你必须先尝试 isEmpty。
您可以在 SFTP 读取操作后使用 attributes.size
来访问 SFTP 服务器报告的文件大小,避免将整个文件加载到内存中只是为了测量它有多大。
为了探索 Mule 4 中 streaming/chunking 的选项,您确定了一些关键问题。要在给定任意输入文件大小的情况下控制内存占用,我们必须使用以下方法:
- 可重复文件存储流在社区版中不可用
- 您可以启用 streaming in dataweave since you have CSV input。它需要进行一些配置更改,并且您会在运行时丢失转换中的错误检查。
- 您还可以使用带有 For Each 的流式输入来分块输入数据并发送多个 HTTP 请求,而无需同时将所有输入存储在堆上。
如前一个答案所述,您应该使用 attributes.size
来获取文件大小,而不是使用 sizeOf()
重新计算它。这将使您的流程至少通过选择路由器,但是每当您开始 map
您的 csv 到 set-payload
中的 json 时它都会失败。使用您共享的当前配置,增加头部大小将无济于事,您将不得不增加 repeatable-in-memory-stream
的 Max Buffer Size
参数。但还有另一种选择。
您需要先选择正确的流媒体策略。
non-repeatable-stream
如果您要将数据发送到的服务器可以处理分块请求,这可能是您的最佳选择,因为您不想增加您的应用程序的内存,并正在寻找一种以块的形式读取数据的解决方案。 除非您这样做,否则不会将数据加载到内存中。它基本上会使流式有效负载更像 java 的传统InputStream
,因为 您只能使用一次有效负载。 因此,在继续之前这你需要确保你不需要多次有效载荷。这意味着,如果您使用有效载荷记录这样的行数sizeOf(payload)
,它将消耗有效载荷,之后您将拥有空的有效载荷。 您将需要进行以下更改。- Select 流模式为
non-repeatable-stream
. - 更改选择以使用
attributes.size == 0
而不是sizeOf
- 更新地图以生成
deferred
输出。如果您以前没有听过这个词,请不要对它感到困惑。它基本上是流式传输输出。在使用它之前请参考这个 documentation。如果你不使用它,它会再次加载内存中这个映射函数的输出。
- Select 流模式为
output application/json deferred=true
---
payload map (value,index)->{
"id": value.column_0,
"name": value.column_1
}
有了这个,即使您将堆限制为 512 MB(您不应该,但可以),您也可以处理这个 700MB 的文件。
注意:此后 HTTP 请求组件将发送 chunked
数据。您可能还想确认您将 JSON 发送到的服务器是否能够正确处理它。
repeatable-in-memory-stream
:如果你使用这个,AFAIK,你将不得不像这样增加Max Buffer Size
参数,因为你的有效负载非常大。
Max Buffer Size
增加很多,也可能必须增加堆大小。您还可以使用此参数增加最大内存:-Dmule.max.streaming.memory=1024000000