mule 4批处理作业如何处理错误记录

mule 4 batch job how to handle erroring records

我有一个 34 MB XML 文件,我使用 mule 4 批处理模块读取和写入数据库 table。我从操作中得到了一些错误。我正在读取 100 条记录的批次,当我提交给数据库时 table 如果发生一个错误,批次中的所有 100 条都将被丢弃。这是通常的行为吗?有没有办法只删除错误记录。我无法通过错误处理程序捕获错误记录。下面是流程。

    <flow name="-workday-outbound-investigator-job-history-int073Flow" doc:id="088c7b73-6d0a-4cbc-9ba4-94d149ae1e51" initialState="started" >
        <sftp:listener doc:name="On New or Updated File" config-ref="SFTP_Config" directory="${sftp.workingdirectory}" recursive="false" outputMimeType="application/xml" autoDelete="false"  >
            <scheduling-strategy >
                <cron expression="${cron.expression}" />
            </scheduling-strategy>
            <sftp:matcher filenamePattern="aaa_*.xml" />
        </sftp:listener>
                    <set-variable value="#[attributes.fileName]" doc:name="Set Variable" doc:id="3a2502e8-d1c4-4f0a-b1cd-1fcd17e7276c" variableName="file_name"/>
        <ee:transform doc:name="Transform Message" doc:id="5fd8692f-d4d1-4b00-8d6a-36a7981a55d0" >
                        <ee:message >
                            <ee:set-payload ><![CDATA[%dw 2.0
output application/java
import java!org::apache::commons::lang3::StringUtils
---
        payload.*Report_Data.*Report_Entry map ( payload01 , indexOfPayload01 ) ->{
            USERNAME: payload01.UserName default null,
            ACTION: payload01.Action default null,
            EFFDT: payload01.Effdt as Date { class : "java.sql.Date"} default null, 
            ACTION_REASON: payload01.ActionReason default null,
            DTTM_COMPLETE: StringUtils::replace(StringUtils::substring(payload01.DateTimeCompleted,0,19),'T',' ') default null      
        }]]></ee:set-payload>
                        </ee:message>
                    </ee:transform>     
        <batch:job jobName="-workday-outbound-investigator-job-history-int073Batch_Job" doc:id="c8683f42-0bb5-481c-890c-3b20f035c67d" maxFailedRecords="-1">
            <batch:process-records >
                <batch:step name="Batch_Step" doc:id="2fa9964f-ebcc-4588-8025-c36c74613521" acceptPolicy="ALL">

                    <logger level="INFO" doc:name="Logger" doc:id="09298332-6541-47ee-8977-6a42c4ac0f89" message="BATCHINSERTSIZE #[sizeOf(payload)] "/>
                        
                    <batch:aggregator doc:name="Batch Aggregator" doc:id="f746013d-7c90-4d62-bbe0-02c306e22f3d" size="100">
                        <try doc:name="Try" doc:id="22c7241f-d136-40cc-95dd-ec51d6fcf6ec" >
                        <db:bulk-insert doc:name="Bulk insert" config-ref="Oracle_Wint_Database_Config" doc:id="cc38a9d1-3263-4cb2-bd7f-4d04558189eb">
                            <db:bulk-input-parameters><![CDATA[#[%dw 2.0
output application/java
---
        payload map ( val , idx ) ->{
            USERNAME: val.USERNAME default null,
            ACTION: val.ACTION default null,
            EFFDT: val.EFFDT as Date { class : "java.sql.Date"} default null, 
            ACTION_REASON: val.ACTION_REASON default null,
            DTTM_COMPLETE: val.DTTM_COMPLETE default null       
        }]]]></db:bulk-input-parameters>
                        <db:sql><![CDATA[INSERT INTO
  WD_JOB_HISTORY (
    USERNAME, 
    ACTION, 
    EFFDT, 
    ACTION_REASON, 
    DTTM_COMPLETE 
  ) 
VALUES
  (
    :USERNAME, 
    :ACTION, 
    :EFFDT, 
    :ACTION_REASON, 
    :DTTM_COMPLETE 
  )]]></db:sql>
                        </db:bulk-insert>
                        </try>
                        
                    </batch:aggregator>                 
                </batch:step>
            </batch:process-records>
            <batch:on-complete >
                <logger level="INFO" doc:name="Logger" doc:id="41e67041-b38e-4ebb-afab-486b1927470b" message="JOBCOMPLETE"/>
                <ee:transform doc:name="Transform Message" doc:id="48a4f377-14c2-47aa-bf54-c33d8402ec3f" >
                    <ee:message >
                        <ee:set-payload ><![CDATA[%dw 2.0
 output application/json
 ---
 {
"Time [milliseconds]": payload.elapsedTimeInMillis as String default '',
"Total Records": payload.totalRecords,
"Successful Records": payload.successfulRecords,
"Failed Records": payload.failedRecords,
"Loaded Records": payload.loadedRecords,
"Processed Records": payload.processedRecords
 }
]]></ee:set-payload>
                    </ee:message>
                </ee:transform>

        <email:send config-ref="_Email_SMTP" fromAddress="${.mail.from}" subject="#['File completed ' ++ vars.file_name ++ ' in ' ++ dw::System::envVar('WRAPPER_HOSTNAME')]">
            <email:to-addresses >
                <email:to-address value="${.mail.to}" />
            </email:to-addresses>
            <email:body contentType="text/html" >
                
                        <email:content><![CDATA[#[%dw 2.0
import * from dw::System
output application/json
---
{
"job_stats" : payload,
"status" : "INT073 Investigator Job History completed",
"filename": vars.file_name
}]]]></email:content>               
            </email:body>
        </email:send>
                                
            </batch:on-complete>
        </batch:job>
        <error-handler >
            <on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="aed0a547-f56a-49a8-b448-113c2b944415" >
                <logger level="INFO" doc:name="Main Flow Error" doc:id="9fa9041d-fa2a-4c89-9312-4e25399c13f7" message="Error in Main Flow"/>
                    <email:send config-ref="_Email_SMTP" fromAddress="${.mail.from}" subject="#['File Errored out for Employee_ID ' ++ vars.file_name ++ ' in ' ++ dw::System::envVar('WRAPPER_HOSTNAME')]">
                    <email:to-addresses>
                        <email:to-address value="${.mail.to}" />
                    </email:to-addresses>
                    <email:body contentType="text/html">
                        <email:content><![CDATA[#[%dw 2.0
import * from dw::System
output application/json
---
{
"filename": vars.file_name,
"status" : "errored out",
"error_payload" : payload default ''
]]]></email:content>
                    </email:body>
                </email:send>               
            </on-error-continue>
        </error-handler>        
    </flow> 

您不能在流中使用错误处理程序来批量捕获错误。这是意料之中的,因为批处理作业相对于流程异步执行。可以把它想象成流程触发并忘记了批处理作业的执行,而只是继续执行流程中的下一个操作。

Batch 实现了一些 specific error handling for cases like yours. If you want to send a message on failed records you just add a new batch step and set its acceptPolicy attribute to ONLY_FAILURES. Inside that step you can then take appropriate actions to report the errors. See Batch Filters 以获取更多详细信息。

示例:



<batch:job jobName="batchJob">
    <batch:process-records >
        <batch:step name="batchStep1">
            ...
        </batch:step>
        <batch:step name="batchStep2" accept-policy="ONLY_FAILURES">
            ...
        </batch:step>
    </batch:process-records>
 </batch:job>