使用 'non-repeatable' 流读取和解析大型 CSV 文件

Reading and parsing large CSV file using 'non-repeatable' stream

感谢 harshank bansal、Ryan Hoegg 和 aled 对先前问题的建议和评论

在本地使用 Mule 4.4 社区版。

因此,根据评论,我将大型 CSV 文件读取为 non repeatable stream

文件是 CSV 文件,但用竖线分隔:

101|John Saunders|19|M|Physics|Chemistry|Mechanics
102|Jim White|17|M|Languages|Art|Pottery
...
...

读取文件后,我会使用 attributes.size == 0 检查空文件 现在在这一点上使用 Set Payloaddeferred=true .

在这个组件之后的调试模式下,有效负载显示为 java.io.PipedInputStream(很好,这只是一个观察)

现在我将此负载传递给调用 REST 端点的 HTTP Request 组件。目前已模拟此 REST 端点并正在记录此 REST 端点接收的有效负载。

然而,此 REST 端点接收到的负载是一个空数组

我需要在 HTTP 请求上设置任何属性吗?我可以看到 Request Streaming mode 等属性,我们需要在那里配置什么吗?

另一个疑问是:REST 服务预计被调用多少次?它会被多次调用还是只被调用一次?

这是我尝试过的完整代码。

    <flow name="get:employee" >
    
        
        <set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
        
            <file:read doc:name="Read emp file"  
                config-ref="File_Config" 
                path="/emp_large.unl" 
                outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
                <non-repeatable-stream />
            </file:read>
        
        <try doc:name="Try" >
                <choice doc:name="is the file empty ?" >
            <when expression="attributes.size == 0">
                <logger level="ERROR" doc:name="Payload is empty " message="Co-relationId  : #[correlationId]  Empty payload from file: #[vars.fileName] !" category="load.empData" />
                <raise-error doc:name="Raise error on empty file "  type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File  either empty or not received or error in reading file content" />
            </when>
            <otherwise>
                
            </otherwise>
        </choice>
            <set-payload value='#[output application/json deferred=true&#10;&#10;&#10;&#10;---&#10;&#10;&#10;payload map (value,index)-&gt;{&#10;    "id": value.column_0,&#10;    "name": value.column_1&#10;}]' doc:name="Set Payload"  />
            <http:request method="POST" doc:name="Submit products to Rest API"  config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="NEVER">
                <non-repeatable-stream />
                <http:headers ><![CDATA[#[output application/java
---
{
    
    "Host" : p('http_emp.host'),
    "Content-Type" : "application/json"
}]]]></http:headers>
                <http:response-validator >
                    <http:success-status-code-validator values="200" />
                </http:response-validator>
            </http:request>
                <logger level="INFO" doc:name="Logger" category="load.empData" message="Co-relationId  : #[correlationId]  Successfully published emp data to XYZ , response received is : #[payload]"/>
            <error-handler >
                        <logger level="ERROR" doc:name="Failure log" message="Co-relationId  : #[correlationId] error encountered after reading  file #[vars.fileName]  , caused by : #[error.detailedDescription]" category="load.empData"/>
                    
                    </on-error-continue>
                </error-handler>
            </try>
        <logger level="INFO" doc:name="Logger"  message="Co-relationId  : #[correlationId]  processing emp data END:" category="load.empData"/>
        
    </flow>

附加流程的屏幕打印以便于可视化:

Edit1:粘贴下面的完整代码:

<flow name="get:employee" doc:id="feffbaae-2873-4248-a043-d51697083b75">
    <logger level="INFO" doc:name="Logger" doc:id="10c9e0bb-7f18-42b8-9378-1225cb546641" message="Co-relationId  : #[correlationId]  processing emp data START:" category="send.empData"/>
        
        <set-variable value='#[now() as String { format: "ddMMuu" }]' doc:name="Set todays date as ddmmyy" doc:id="37de829b-bc84-44f6-88d9-e66c52fabe83" variableName="today"/>
        
        <logger level="INFO" doc:name="Logger" doc:id="0f182868-404c-491e-acab-88832b73d73e" category="send.empData" message="Co-relationId  : #[correlationId]  After successfully moving file "/>
        <until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="cec72a48-72b7-436d-a024-fb3986fb3432" millisBetweenRetries="${sftp.retry.frequency}">
            <file:read doc:name="Read emp file" doc:id="e77633d5-5f4f-43a9-862b-9d6076308c2a" 
                config-ref="File_Config" 
                path="C:\emp_large.unl" 
                outputMimeType="text/csv; streaming=true; header=false; separator=|" outputEncoding="UTF-8">
                <non-repeatable-stream />
            </file:read>
        </until-successful>
        <logger level="INFO" doc:name="Logger" doc:id="438f9b64-8d66-4999-ac72-cdd7ade3cd0f" category="send.empData" message="Co-relationId  : #[correlationId]  After successfully Reading file "/>
        <try doc:name="Try" doc:id="c934788a-28db-4e49-a7cd-ee8eaff026ae" >
                <choice doc:name="is the file empty ?" doc:id="69c57e57-354c-4a01-9810-35fc4228d5d9">
            <when expression="attributes.size == 0">
                <logger level="ERROR" doc:name="Payload is empty " doc:id="08decd27-c8dc-41e7-8c1d-f395f405b248" message="Co-relationId  : #[correlationId]  Empty payload from file!" category="send.empData" />
                <raise-error doc:name="Raise error on empty file " doc:id="ad6e2e62-09f2-4e4a-bc2c-99d7ce2c95c8" type="EMP:FILE_NOT_PRESENT_OR_EMPTY" description="Emp File  either empty or not received or error in reading file content" />
            </when>
            <otherwise>
                <logger level="INFO" doc:name="Payload not empty" doc:id="606550aa-c59e-4ff4-ad15-026a8616845a" message="Co-relationId  : #[correlationId]  File not empty and contains #[sizeOf(payload)] rows" category="send.empData" />
            </otherwise>
        </choice>
            <set-payload value='#[output application/json deferred=true&#10;&#10;&#10;&#10;---&#10;&#10;&#10;&#10;  {"clientId": "abcd",&#10;"employees": payload map (value,index)-&gt;{&#10;    "id": value.column_0,&#10;    "name": value.column_1&#10;}&#10;}]' doc:name="Set Payload" doc:id="5dba41e5-df65-42e3-8899-0c4abf8f8c16" />
            <http:request method="POST" doc:name="Submit products to XYZ" doc:id="575d1c2f-d20f-4174-b7e6-ad0074ea7eb9" config-ref="HTTP_Request_config" path="/emps" requestStreamingMode="AUTO" sendBodyMode="AUTO">
                <non-repeatable-stream />
                <http:headers ><![CDATA[#[output application/java
---
{
    
    "Host" : p('http_emp.host'),
    "Content-Type" : "application/json"
}]]]></http:headers>
                <http:response-validator >
                    <http:success-status-code-validator values="200" />
                </http:response-validator>
            </http:request>
                <logger level="INFO" doc:name="Logger" doc:id="d3988977-bdda-4eac-bf88-508201582a78" category="send.empData" message="Co-relationId  : #[correlationId]  Successfully published product data to XYZ , response received is : #[payload]"/>
            <error-handler >
                    <on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue - Move emp file from processing to error folder" doc:id="a02560ca-77fb-443f-88b0-fcc27185ea7c" >
                        <logger level="ERROR" doc:name="Failure log" doc:id="c4a10961-d853-4fc4-87d7-9b5c55750a7c" message="Co-relationId  : #[correlationId] error encountered after reading  file  , caused by : #[error.detailedDescription]" category="send.empData"/>
                    <until-successful maxRetries="${sftp.retry.attempts}" doc:name="Until Successful" doc:id="d8b3e71a-02fc-4c19-be7f-3851288b85a0" millisBetweenRetries="${sftp.retry.frequency}">
                        <sftp:move doc:name="Move file to error" doc:id="22b353e3-ec17-4df5-aa20-4055e0a6cf3b" config-ref="SFTP_Config" sourcePath="#[p('sftp.outputProcessingDir') ++ '/' ++ vars.fileName]" targetPath="${sftp.outputErrorDir}" createParentDirectories="false" overwrite="true"/>
                    </until-successful>
                    <logger level="INFO" doc:name="Logger" doc:id="dcc9dd6b-0734-486d-b0c1-a9fc3fe64348" category="send.empData" message="Co-relationId  : #[correlationId]  successfully moved  file  to error folder"/>
                    </on-error-continue>
                </error-handler>
            </try>
        <logger level="INFO" doc:name="Logger" doc:id="20453f8a-45ad-4e23-98d5-a03a66509b5d" message="Co-relationId  : #[correlationId]  processing emp data END:" category="send.empData"/>
        <error-handler >
            <on-error-continue enableNotifications="false" logException="false" doc:name="On Error Continue" doc:id="3fa169c0-92ad-431b-9440-a21448db4bce" type="RETRY_EXHAUSTED">
                <logger level="ERROR" doc:name="Logger" doc:id="bf68b097-9b02-4770-b0f3-9972bc91a97f" message="Co-relationId  : #[correlationId] while processing emp file Error is : #[error.suppressedErrors[0].errorType] caused by #[error.suppressedErrors[0].detailedDescription]" category="send.empData"/>
            </on-error-continue>
        </error-handler>
    </flow>

这是使用 non repeatable stream 的另一个缺点。发生这种情况是因为您放置的断点将使用 InputStream 并且您只能使用一次输入流。因此,您的 set-payload 在执行时将收到一个空负载。话虽如此,我相信调试器应该足够聪明,可以检测到有效负载是不可重复的流并且不会使用它。

尝试禁用断点,如果其他一切设置正确,它应该将请求发送到 HTTP 端点。如果您想查看有效负载,可以尝试在 set-payload 之后记录几行一次,然后删除记录器。 (我知道这听起来并不吸引人,但目前这是我在当前版本的 Anypoint studio 上唯一能想到的选择。)

编辑: 您发送空数组的另一个原因是您在记录 sizeOf(payload) 时使用了 InputStream。因此,您的 set-payload 正在使用一个空流。

针对你的问题。

how many times is the REST service expected to be called ? will it get called multiple times or only a single request ?

这个基本上是用Transfer-Encoding: chunked。因此请求是通过单个请求发送的,但是由于在发送请求之前不会生成有效负载,因此它会在 chunks 中发送,直到请求完成。