使用 'non-repeatable' 流读取和解析大型 CSV 文件
Reading and parsing large CSV file using 'non-repeatable' stream
感谢 harshank bansal、Ryan Hoegg 和 aled 对先前问题的建议和评论
在本地使用 Mule 4.4 社区版。
因此,根据评论,我将大型 CSV 文件读取为 non repeatable stream
文件是 CSV 文件,但用竖线分隔:
101|John Saunders|19|M|Physics|Chemistry|Mechanics
102|Jim White|17|M|Languages|Art|Pottery
...
...
读取文件后,我会使用 attributes.size == 0
检查空文件
现在在这一点上使用 Set Payload
和 deferred=true
.
在这个组件之后的调试模式下,有效负载显示为 java.io.PipedInputStream
(很好,这只是一个观察)
现在我将此负载传递给调用 REST 端点的 HTTP Request
组件。目前已模拟此 REST 端点并正在记录此 REST 端点接收的有效负载。
然而,此 REST 端点接收到的负载是一个空数组
我需要在 HTTP 请求上设置任何属性吗?我可以看到 Request Streaming mode
等属性,我们需要在那里配置什么吗?
另一个疑问是:REST 服务预计被调用多少次?它会被多次调用还是只被调用一次?
这是我尝试过的完整代码。
<flow name="get:employee" >
<set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
<file:read doc:name="Read emp file"
config-ref="File_Config"
path="/emp_large.unl"
outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
<non-repeatable-stream />
</file:read>
<try doc:name="Try" >
<choice doc:name="is the file empty ?" >
<when expression="attributes.size == 0">
<logger level="ERROR" doc:name="Payload is empty " message="Co-relationId : #[correlationId] Empty payload from file: #[vars.fileName] !" category="load.empData" />
<raise-error doc:name="Raise error on empty file " type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File either empty or not received or error in reading file content" />
</when>
<otherwise>
</otherwise>
</choice>
<set-payload value='#[output application/json deferred=true --- payload map (value,index)->{ "id": value.column_0, "name": value.column_1 }]' doc:name="Set Payload" />
<http:request method="POST" doc:name="Submit products to Rest API" config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="NEVER">
<non-repeatable-stream />
<http:headers ><![CDATA[#[output application/java
---
{
"Host" : p('http_emp.host'),
"Content-Type" : "application/json"
}]]]></http:headers>
<http:response-validator >
<http:success-status-code-validator values="200" />
</http:response-validator>
</http:request>
<logger level="INFO" doc:name="Logger" category="load.empData" message="Co-relationId : #[correlationId] Successfully published emp data to XYZ , response received is : #[payload]"/>
<error-handler >
<logger level="ERROR" doc:name="Failure log" message="Co-relationId : #[correlationId] error encountered after reading file #[vars.fileName] , caused by : #[error.detailedDescription]" category="load.empData"/>
</on-error-continue>
</error-handler>
</try>
<logger level="INFO" doc:name="Logger" message="Co-relationId : #[correlationId] processing emp data END:" category="load.empData"/>
</flow>
附加流程的屏幕打印以便于可视化:
Edit1:粘贴下面的完整代码:
<flow name="get:employee" doc:id="feffbaae-2873-4248-a043-d51697083b75">
<logger level="INFO" doc:name="Logger" doc:id="10c9e0bb-7f18-42b8-9378-1225cb546641" message="Co-relationId : #[correlationId] processing emp data START:" category="send.empData"/>
<set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
<logger level="INFO" doc:name="Logger" doc:id="0f182868-404c-491e-acab-88832b73d73e" category="send.empData" message="Co-relationId : #[correlationId] After successfully moving file "/>
<until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="cec72a48-72b7-436d-a024-fb3986fb3432" millisBetweenRetries="${sftp.retry.frequency}">
<file:read doc:name="Read emp file" doc:id="e77633d5-5f4f-43a9-862b-9d6076308c2a"
config-ref="File_Config"
path="C:\emp_large.unl"
outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
<non-repeatable-stream />
</file:read>
</until-successful>
<logger level="INFO" doc:name="Logger" doc:id="438f9b64-8d66-4999-ac72-cdd7ade3cd0f" category="send.empData" message="Co-relationId : #[correlationId] After successfully Reading file "/>
<try doc:name="Try" doc:id="c934788a-28db-4e49-a7cd-ee8eaff026ae" >
<choice doc:name="is the file empty ?" doc:id="69c57e57-354c-4a01-9810-35fc4228d5d9">
<when expression="attributes.size == 0">
<logger level="ERROR" doc:name="Payload is empty " doc:id="08decd27-c8dc-41e7-8c1d-f395f405b248" message="Co-relationId : #[correlationId] Empty payload from file!" category="send.empData" />
<raise-error doc:name="Raise error on empty file " doc:id="ad6e2e62-09f2-4e4a-bc2c-99d7ce2c95c8" type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File either empty or not received or error in reading file content" />
</when>
<otherwise>
<logger level="INFO" doc:name="Payload not empty" doc:id="606550aa-c59e-4ff4-ad15-026a8616845a" message="Co-relationId : #[correlationId] File not empty and contains #[sizeOf(payload)] rows" category="send.empData" />
</otherwise>
</choice>
<set-payload value='#[output application/json deferred=true --- {"clientId": "abcd", "employees": payload map (value,index)->{ "id": value.column_0, "name": value.column_1 } }]' doc:name="Set Payload" doc:id="5dba41e5-df65-42e3-8899-0c4abf8f8c16" />
<http:request method="POST" doc:name="Submit products to XYZ" doc:id="575d1c2f-d20f-4174-b7e6-ad0074ea7eb9" config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="AUTO" sendBodyMode="AUTO">
<non-repeatable-stream />
<http:headers ><![CDATA[#[output application/java
---
{
"Host" : p('http_emp.host'),
"Content-Type" : "application/json"
}]]]></http:headers>
<http:response-validator >
<http:success-status-code-validator values="200" />
</http:response-validator>
</http:request>
<logger level="INFO" doc:name="Logger" doc:id="d3988977-bdda-4eac-bf88-508201582a78" category="send.empData" message="Co-relationId : #[correlationId] Successfully published product data to XYZ , response received is : #[payload]"/>
<error-handler >
<on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue - Move emp file from processing to error folder" doc:id="a02560ca-77fb-443f-88b0-fcc27185ea7c" >
<logger level="ERROR" doc:name="Failure log" doc:id="c4a10961-d853-4fc4-87d7-9b5c55750a7c" message="Co-relationId : #[correlationId] error encountered after reading file , caused by : #[error.detailedDescription]" category="send.empData"/>
<until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="d8b3e71a-02fc-4c19-be7f-3851288b85a0" millisBetweenRetries="${sftp.retry.frequency}">
<sftp:move doc:name="Move file to error" doc:id="22b353e3-ec17-4df5-aa20-4055e0a6cf3b" config-ref="SFTP_Config" sourcePath="#[p('sftp.outputProcessingDir') ++ '/' ++ vars.fileName]" targetPath="${sftp.outputErrorDir}" createParentDirectories="false" overwrite="true"/>
</until-successful>
<logger level="INFO" doc:name="Logger" doc:id="dcc9dd6b-0734-486d-b0c1-a9fc3fe64348" category="send.empData" message="Co-relationId : #[correlationId] successfully moved file to error folder"/>
</on-error-continue>
</error-handler>
</try>
<logger level="INFO" doc:name="Logger" doc:id="20453f8a-45ad-4e23-98d5-a03a66509b5d" message="Co-relationId : #[correlationId] processing emp data END:" category="send.empData"/>
<error-handler >
<on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue" doc:id="3fa169c0-92ad-431b-9440-a21448db4bce" type="RETRY_EXHAUSTED">
<logger level="ERROR" doc:name="Logger" doc:id="bf68b097-9b02-4770-b0f3-9972bc91a97f" message="Co-relationId : #[correlationId] while processing emp file Error is : #[error.suppressedErrors[0].errorType] caused by #[error.suppressedErrors[0].detailedDescription]" category="send.empData"/>
</on-error-continue>
</error-handler>
</flow>
这是使用 non repeatable stream
的另一个缺点。发生这种情况是因为您放置的断点将使用 InputStream
并且您只能使用一次输入流。因此,您的 set-payload
在执行时将收到一个空负载。话虽如此,我相信调试器应该足够聪明,可以检测到有效负载是不可重复的流并且不会使用它。
尝试禁用断点,如果其他一切设置正确,它应该将请求发送到 HTTP 端点。如果您想查看有效负载,可以尝试在 set-payload
之后记录几行一次,然后删除记录器。 (我知道这听起来并不吸引人,但目前这是我在当前版本的 Anypoint studio 上唯一能想到的选择。)
编辑: 您发送空数组的另一个原因是您在记录 sizeOf(payload)
时使用了 InputStream。因此,您的 set-payload
正在使用一个空流。
针对你的问题。
how many times is the REST service expected to be called ? will it get called multiple times or only a single request ?
这个基本上是用Transfer-Encoding: chunked
。因此请求是通过单个请求发送的,但是由于在发送请求之前不会生成有效负载,因此它会在 chunks
中发送,直到请求完成。
感谢 harshank bansal、Ryan Hoegg 和 aled 对先前问题的建议和评论
在本地使用 Mule 4.4 社区版。
因此,根据评论,我将大型 CSV 文件读取为 non repeatable stream
文件是 CSV 文件,但用竖线分隔:
101|John Saunders|19|M|Physics|Chemistry|Mechanics
102|Jim White|17|M|Languages|Art|Pottery
...
...
读取文件后,我会使用 attributes.size == 0
检查空文件
现在在这一点上使用 Set Payload
和 deferred=true
.
在这个组件之后的调试模式下,有效负载显示为 java.io.PipedInputStream
(很好,这只是一个观察)
现在我将此负载传递给调用 REST 端点的 HTTP Request
组件。目前已模拟此 REST 端点并正在记录此 REST 端点接收的有效负载。
然而,此 REST 端点接收到的负载是一个空数组
我需要在 HTTP 请求上设置任何属性吗?我可以看到 Request Streaming mode
等属性,我们需要在那里配置什么吗?
另一个疑问是:REST 服务预计被调用多少次?它会被多次调用还是只被调用一次?
这是我尝试过的完整代码。
<flow name="get:employee" >
<set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
<file:read doc:name="Read emp file"
config-ref="File_Config"
path="/emp_large.unl"
outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
<non-repeatable-stream />
</file:read>
<try doc:name="Try" >
<choice doc:name="is the file empty ?" >
<when expression="attributes.size == 0">
<logger level="ERROR" doc:name="Payload is empty " message="Co-relationId : #[correlationId] Empty payload from file: #[vars.fileName] !" category="load.empData" />
<raise-error doc:name="Raise error on empty file " type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File either empty or not received or error in reading file content" />
</when>
<otherwise>
</otherwise>
</choice>
<set-payload value='#[output application/json deferred=true --- payload map (value,index)->{ "id": value.column_0, "name": value.column_1 }]' doc:name="Set Payload" />
<http:request method="POST" doc:name="Submit products to Rest API" config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="NEVER">
<non-repeatable-stream />
<http:headers ><![CDATA[#[output application/java
---
{
"Host" : p('http_emp.host'),
"Content-Type" : "application/json"
}]]]></http:headers>
<http:response-validator >
<http:success-status-code-validator values="200" />
</http:response-validator>
</http:request>
<logger level="INFO" doc:name="Logger" category="load.empData" message="Co-relationId : #[correlationId] Successfully published emp data to XYZ , response received is : #[payload]"/>
<error-handler >
<logger level="ERROR" doc:name="Failure log" message="Co-relationId : #[correlationId] error encountered after reading file #[vars.fileName] , caused by : #[error.detailedDescription]" category="load.empData"/>
</on-error-continue>
</error-handler>
</try>
<logger level="INFO" doc:name="Logger" message="Co-relationId : #[correlationId] processing emp data END:" category="load.empData"/>
</flow>
附加流程的屏幕打印以便于可视化:
Edit1:粘贴下面的完整代码:
<flow name="get:employee" doc:id="feffbaae-2873-4248-a043-d51697083b75">
<logger level="INFO" doc:name="Logger" doc:id="10c9e0bb-7f18-42b8-9378-1225cb546641" message="Co-relationId : #[correlationId] processing emp data START:" category="send.empData"/>
<set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
<logger level="INFO" doc:name="Logger" doc:id="0f182868-404c-491e-acab-88832b73d73e" category="send.empData" message="Co-relationId : #[correlationId] After successfully moving file "/>
<until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="cec72a48-72b7-436d-a024-fb3986fb3432" millisBetweenRetries="${sftp.retry.frequency}">
<file:read doc:name="Read emp file" doc:id="e77633d5-5f4f-43a9-862b-9d6076308c2a"
config-ref="File_Config"
path="C:\emp_large.unl"
outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
<non-repeatable-stream />
</file:read>
</until-successful>
<logger level="INFO" doc:name="Logger" doc:id="438f9b64-8d66-4999-ac72-cdd7ade3cd0f" category="send.empData" message="Co-relationId : #[correlationId] After successfully Reading file "/>
<try doc:name="Try" doc:id="c934788a-28db-4e49-a7cd-ee8eaff026ae" >
<choice doc:name="is the file empty ?" doc:id="69c57e57-354c-4a01-9810-35fc4228d5d9">
<when expression="attributes.size == 0">
<logger level="ERROR" doc:name="Payload is empty " doc:id="08decd27-c8dc-41e7-8c1d-f395f405b248" message="Co-relationId : #[correlationId] Empty payload from file!" category="send.empData" />
<raise-error doc:name="Raise error on empty file " doc:id="ad6e2e62-09f2-4e4a-bc2c-99d7ce2c95c8" type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File either empty or not received or error in reading file content" />
</when>
<otherwise>
<logger level="INFO" doc:name="Payload not empty" doc:id="606550aa-c59e-4ff4-ad15-026a8616845a" message="Co-relationId : #[correlationId] File not empty and contains #[sizeOf(payload)] rows" category="send.empData" />
</otherwise>
</choice>
<set-payload value='#[output application/json deferred=true --- {"clientId": "abcd", "employees": payload map (value,index)->{ "id": value.column_0, "name": value.column_1 } }]' doc:name="Set Payload" doc:id="5dba41e5-df65-42e3-8899-0c4abf8f8c16" />
<http:request method="POST" doc:name="Submit products to XYZ" doc:id="575d1c2f-d20f-4174-b7e6-ad0074ea7eb9" config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="AUTO" sendBodyMode="AUTO">
<non-repeatable-stream />
<http:headers ><![CDATA[#[output application/java
---
{
"Host" : p('http_emp.host'),
"Content-Type" : "application/json"
}]]]></http:headers>
<http:response-validator >
<http:success-status-code-validator values="200" />
</http:response-validator>
</http:request>
<logger level="INFO" doc:name="Logger" doc:id="d3988977-bdda-4eac-bf88-508201582a78" category="send.empData" message="Co-relationId : #[correlationId] Successfully published product data to XYZ , response received is : #[payload]"/>
<error-handler >
<on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue - Move emp file from processing to error folder" doc:id="a02560ca-77fb-443f-88b0-fcc27185ea7c" >
<logger level="ERROR" doc:name="Failure log" doc:id="c4a10961-d853-4fc4-87d7-9b5c55750a7c" message="Co-relationId : #[correlationId] error encountered after reading file , caused by : #[error.detailedDescription]" category="send.empData"/>
<until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="d8b3e71a-02fc-4c19-be7f-3851288b85a0" millisBetweenRetries="${sftp.retry.frequency}">
<sftp:move doc:name="Move file to error" doc:id="22b353e3-ec17-4df5-aa20-4055e0a6cf3b" config-ref="SFTP_Config" sourcePath="#[p('sftp.outputProcessingDir') ++ '/' ++ vars.fileName]" targetPath="${sftp.outputErrorDir}" createParentDirectories="false" overwrite="true"/>
</until-successful>
<logger level="INFO" doc:name="Logger" doc:id="dcc9dd6b-0734-486d-b0c1-a9fc3fe64348" category="send.empData" message="Co-relationId : #[correlationId] successfully moved file to error folder"/>
</on-error-continue>
</error-handler>
</try>
<logger level="INFO" doc:name="Logger" doc:id="20453f8a-45ad-4e23-98d5-a03a66509b5d" message="Co-relationId : #[correlationId] processing emp data END:" category="send.empData"/>
<error-handler >
<on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue" doc:id="3fa169c0-92ad-431b-9440-a21448db4bce" type="RETRY_EXHAUSTED">
<logger level="ERROR" doc:name="Logger" doc:id="bf68b097-9b02-4770-b0f3-9972bc91a97f" message="Co-relationId : #[correlationId] while processing emp file Error is : #[error.suppressedErrors[0].errorType] caused by #[error.suppressedErrors[0].detailedDescription]" category="send.empData"/>
</on-error-continue>
</error-handler>
</flow>
这是使用 non repeatable stream
的另一个缺点。发生这种情况是因为您放置的断点将使用 InputStream
并且您只能使用一次输入流。因此,您的 set-payload
在执行时将收到一个空负载。话虽如此,我相信调试器应该足够聪明,可以检测到有效负载是不可重复的流并且不会使用它。
尝试禁用断点,如果其他一切设置正确,它应该将请求发送到 HTTP 端点。如果您想查看有效负载,可以尝试在 set-payload
之后记录几行一次,然后删除记录器。 (我知道这听起来并不吸引人,但目前这是我在当前版本的 Anypoint studio 上唯一能想到的选择。)
编辑: 您发送空数组的另一个原因是您在记录 sizeOf(payload)
时使用了 InputStream。因此,您的 set-payload
正在使用一个空流。
针对你的问题。
how many times is the REST service expected to be called ? will it get called multiple times or only a single request ?
这个基本上是用Transfer-Encoding: chunked
。因此请求是通过单个请求发送的,但是由于在发送请求之前不会生成有效负载,因此它会在 chunks
中发送,直到请求完成。