mule 4批处理作业如何处理错误记录
mule 4 batch job how to handle erroring records
我有一个 34 MB XML 文件,我使用 mule 4 批处理模块读取和写入数据库 table。我从操作中得到了一些错误。我正在读取 100 条记录的批次,当我提交给数据库时 table 如果发生一个错误,批次中的所有 100 条都将被丢弃。这是通常的行为吗?有没有办法只删除错误记录。我无法通过错误处理程序捕获错误记录。下面是流程。
<flow name="-workday-outbound-investigator-job-history-int073Flow" doc:id="088c7b73-6d0a-4cbc-9ba4-94d149ae1e51" initialState="started" >
<sftp:listener doc:name="On New or Updated File" config-ref="SFTP_Config" directory="${sftp.workingdirectory}" recursive="false" outputMimeType="application/xml" autoDelete="false" >
<scheduling-strategy >
<cron expression="${cron.expression}" />
</scheduling-strategy>
<sftp:matcher filenamePattern="aaa_*.xml" />
</sftp:listener>
<set-variable value="#[attributes.fileName]" doc:name="Set Variable" doc:id="3a2502e8-d1c4-4f0a-b1cd-1fcd17e7276c" variableName="file_name"/>
<ee:transform doc:name="Transform Message" doc:id="5fd8692f-d4d1-4b00-8d6a-36a7981a55d0" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/java
import java!org::apache::commons::lang3::StringUtils
---
payload.*Report_Data.*Report_Entry map ( payload01 , indexOfPayload01 ) ->{
USERNAME: payload01.UserName default null,
ACTION: payload01.Action default null,
EFFDT: payload01.Effdt as Date { class : "java.sql.Date"} default null,
ACTION_REASON: payload01.ActionReason default null,
DTTM_COMPLETE: StringUtils::replace(StringUtils::substring(payload01.DateTimeCompleted,0,19),'T',' ') default null
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<batch:job jobName="-workday-outbound-investigator-job-history-int073Batch_Job" doc:id="c8683f42-0bb5-481c-890c-3b20f035c67d" maxFailedRecords="-1">
<batch:process-records >
<batch:step name="Batch_Step" doc:id="2fa9964f-ebcc-4588-8025-c36c74613521" acceptPolicy="ALL">
<logger level="INFO" doc:name="Logger" doc:id="09298332-6541-47ee-8977-6a42c4ac0f89" message="BATCHINSERTSIZE #[sizeOf(payload)] "/>
<batch:aggregator doc:name="Batch Aggregator" doc:id="f746013d-7c90-4d62-bbe0-02c306e22f3d" size="100">
<try doc:name="Try" doc:id="22c7241f-d136-40cc-95dd-ec51d6fcf6ec" >
<db:bulk-insert doc:name="Bulk insert" config-ref="Oracle_Wint_Database_Config" doc:id="cc38a9d1-3263-4cb2-bd7f-4d04558189eb">
<db:bulk-input-parameters><![CDATA[#[%dw 2.0
output application/java
---
payload map ( val , idx ) ->{
USERNAME: val.USERNAME default null,
ACTION: val.ACTION default null,
EFFDT: val.EFFDT as Date { class : "java.sql.Date"} default null,
ACTION_REASON: val.ACTION_REASON default null,
DTTM_COMPLETE: val.DTTM_COMPLETE default null
}]]]></db:bulk-input-parameters>
<db:sql><![CDATA[INSERT INTO
WD_JOB_HISTORY (
USERNAME,
ACTION,
EFFDT,
ACTION_REASON,
DTTM_COMPLETE
)
VALUES
(
:USERNAME,
:ACTION,
:EFFDT,
:ACTION_REASON,
:DTTM_COMPLETE
)]]></db:sql>
</db:bulk-insert>
</try>
</batch:aggregator>
</batch:step>
</batch:process-records>
<batch:on-complete >
<logger level="INFO" doc:name="Logger" doc:id="41e67041-b38e-4ebb-afab-486b1927470b" message="JOBCOMPLETE"/>
<ee:transform doc:name="Transform Message" doc:id="48a4f377-14c2-47aa-bf54-c33d8402ec3f" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
"Time [milliseconds]": payload.elapsedTimeInMillis as String default '',
"Total Records": payload.totalRecords,
"Successful Records": payload.successfulRecords,
"Failed Records": payload.failedRecords,
"Loaded Records": payload.loadedRecords,
"Processed Records": payload.processedRecords
}
]]></ee:set-payload>
</ee:message>
</ee:transform>
<email:send config-ref="_Email_SMTP" fromAddress="${.mail.from}" subject="#['File completed ' ++ vars.file_name ++ ' in ' ++ dw::System::envVar('WRAPPER_HOSTNAME')]">
<email:to-addresses >
<email:to-address value="${.mail.to}" />
</email:to-addresses>
<email:body contentType="text/html" >
<email:content><![CDATA[#[%dw 2.0
import * from dw::System
output application/json
---
{
"job_stats" : payload,
"status" : "INT073 Investigator Job History completed",
"filename": vars.file_name
}]]]></email:content>
</email:body>
</email:send>
</batch:on-complete>
</batch:job>
<error-handler >
<on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="aed0a547-f56a-49a8-b448-113c2b944415" >
<logger level="INFO" doc:name="Main Flow Error" doc:id="9fa9041d-fa2a-4c89-9312-4e25399c13f7" message="Error in Main Flow"/>
<email:send config-ref="_Email_SMTP" fromAddress="${.mail.from}" subject="#['File Errored out for Employee_ID ' ++ vars.file_name ++ ' in ' ++ dw::System::envVar('WRAPPER_HOSTNAME')]">
<email:to-addresses>
<email:to-address value="${.mail.to}" />
</email:to-addresses>
<email:body contentType="text/html">
<email:content><![CDATA[#[%dw 2.0
import * from dw::System
output application/json
---
{
"filename": vars.file_name,
"status" : "errored out",
"error_payload" : payload default ''
]]]></email:content>
</email:body>
</email:send>
</on-error-continue>
</error-handler>
</flow>
您不能在流中使用错误处理程序来批量捕获错误。这是意料之中的,因为批处理作业相对于流程异步执行。可以把它想象成流程触发并忘记了批处理作业的执行,而只是继续执行流程中的下一个操作。
Batch 实现了一些 specific error handling for cases like yours. If you want to send a message on failed records you just add a new batch step and set its acceptPolicy
attribute to ONLY_FAILURES
. Inside that step you can then take appropriate actions to report the errors. See Batch Filters 以获取更多详细信息。
示例:
<batch:job jobName="batchJob">
<batch:process-records >
<batch:step name="batchStep1">
...
</batch:step>
<batch:step name="batchStep2" accept-policy="ONLY_FAILURES">
...
</batch:step>
</batch:process-records>
</batch:job>
我有一个 34 MB XML 文件,我使用 mule 4 批处理模块读取和写入数据库 table。我从操作中得到了一些错误。我正在读取 100 条记录的批次,当我提交给数据库时 table 如果发生一个错误,批次中的所有 100 条都将被丢弃。这是通常的行为吗?有没有办法只删除错误记录。我无法通过错误处理程序捕获错误记录。下面是流程。
<flow name="-workday-outbound-investigator-job-history-int073Flow" doc:id="088c7b73-6d0a-4cbc-9ba4-94d149ae1e51" initialState="started" >
<sftp:listener doc:name="On New or Updated File" config-ref="SFTP_Config" directory="${sftp.workingdirectory}" recursive="false" outputMimeType="application/xml" autoDelete="false" >
<scheduling-strategy >
<cron expression="${cron.expression}" />
</scheduling-strategy>
<sftp:matcher filenamePattern="aaa_*.xml" />
</sftp:listener>
<set-variable value="#[attributes.fileName]" doc:name="Set Variable" doc:id="3a2502e8-d1c4-4f0a-b1cd-1fcd17e7276c" variableName="file_name"/>
<ee:transform doc:name="Transform Message" doc:id="5fd8692f-d4d1-4b00-8d6a-36a7981a55d0" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/java
import java!org::apache::commons::lang3::StringUtils
---
payload.*Report_Data.*Report_Entry map ( payload01 , indexOfPayload01 ) ->{
USERNAME: payload01.UserName default null,
ACTION: payload01.Action default null,
EFFDT: payload01.Effdt as Date { class : "java.sql.Date"} default null,
ACTION_REASON: payload01.ActionReason default null,
DTTM_COMPLETE: StringUtils::replace(StringUtils::substring(payload01.DateTimeCompleted,0,19),'T',' ') default null
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<batch:job jobName="-workday-outbound-investigator-job-history-int073Batch_Job" doc:id="c8683f42-0bb5-481c-890c-3b20f035c67d" maxFailedRecords="-1">
<batch:process-records >
<batch:step name="Batch_Step" doc:id="2fa9964f-ebcc-4588-8025-c36c74613521" acceptPolicy="ALL">
<logger level="INFO" doc:name="Logger" doc:id="09298332-6541-47ee-8977-6a42c4ac0f89" message="BATCHINSERTSIZE #[sizeOf(payload)] "/>
<batch:aggregator doc:name="Batch Aggregator" doc:id="f746013d-7c90-4d62-bbe0-02c306e22f3d" size="100">
<try doc:name="Try" doc:id="22c7241f-d136-40cc-95dd-ec51d6fcf6ec" >
<db:bulk-insert doc:name="Bulk insert" config-ref="Oracle_Wint_Database_Config" doc:id="cc38a9d1-3263-4cb2-bd7f-4d04558189eb">
<db:bulk-input-parameters><![CDATA[#[%dw 2.0
output application/java
---
payload map ( val , idx ) ->{
USERNAME: val.USERNAME default null,
ACTION: val.ACTION default null,
EFFDT: val.EFFDT as Date { class : "java.sql.Date"} default null,
ACTION_REASON: val.ACTION_REASON default null,
DTTM_COMPLETE: val.DTTM_COMPLETE default null
}]]]></db:bulk-input-parameters>
<db:sql><![CDATA[INSERT INTO
WD_JOB_HISTORY (
USERNAME,
ACTION,
EFFDT,
ACTION_REASON,
DTTM_COMPLETE
)
VALUES
(
:USERNAME,
:ACTION,
:EFFDT,
:ACTION_REASON,
:DTTM_COMPLETE
)]]></db:sql>
</db:bulk-insert>
</try>
</batch:aggregator>
</batch:step>
</batch:process-records>
<batch:on-complete >
<logger level="INFO" doc:name="Logger" doc:id="41e67041-b38e-4ebb-afab-486b1927470b" message="JOBCOMPLETE"/>
<ee:transform doc:name="Transform Message" doc:id="48a4f377-14c2-47aa-bf54-c33d8402ec3f" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
"Time [milliseconds]": payload.elapsedTimeInMillis as String default '',
"Total Records": payload.totalRecords,
"Successful Records": payload.successfulRecords,
"Failed Records": payload.failedRecords,
"Loaded Records": payload.loadedRecords,
"Processed Records": payload.processedRecords
}
]]></ee:set-payload>
</ee:message>
</ee:transform>
<email:send config-ref="_Email_SMTP" fromAddress="${.mail.from}" subject="#['File completed ' ++ vars.file_name ++ ' in ' ++ dw::System::envVar('WRAPPER_HOSTNAME')]">
<email:to-addresses >
<email:to-address value="${.mail.to}" />
</email:to-addresses>
<email:body contentType="text/html" >
<email:content><![CDATA[#[%dw 2.0
import * from dw::System
output application/json
---
{
"job_stats" : payload,
"status" : "INT073 Investigator Job History completed",
"filename": vars.file_name
}]]]></email:content>
</email:body>
</email:send>
</batch:on-complete>
</batch:job>
<error-handler >
<on-error-continue enableNotifications="true" logException="true" doc:name="On Error Continue" doc:id="aed0a547-f56a-49a8-b448-113c2b944415" >
<logger level="INFO" doc:name="Main Flow Error" doc:id="9fa9041d-fa2a-4c89-9312-4e25399c13f7" message="Error in Main Flow"/>
<email:send config-ref="_Email_SMTP" fromAddress="${.mail.from}" subject="#['File Errored out for Employee_ID ' ++ vars.file_name ++ ' in ' ++ dw::System::envVar('WRAPPER_HOSTNAME')]">
<email:to-addresses>
<email:to-address value="${.mail.to}" />
</email:to-addresses>
<email:body contentType="text/html">
<email:content><![CDATA[#[%dw 2.0
import * from dw::System
output application/json
---
{
"filename": vars.file_name,
"status" : "errored out",
"error_payload" : payload default ''
]]]></email:content>
</email:body>
</email:send>
</on-error-continue>
</error-handler>
</flow>
您不能在流中使用错误处理程序来批量捕获错误。这是意料之中的,因为批处理作业相对于流程异步执行。可以把它想象成流程触发并忘记了批处理作业的执行,而只是继续执行流程中的下一个操作。
Batch 实现了一些 specific error handling for cases like yours. If you want to send a message on failed records you just add a new batch step and set its acceptPolicy
attribute to ONLY_FAILURES
. Inside that step you can then take appropriate actions to report the errors. See Batch Filters 以获取更多详细信息。
示例:
<batch:job jobName="batchJob">
<batch:process-records >
<batch:step name="batchStep1">
...
</batch:step>
<batch:step name="batchStep2" accept-policy="ONLY_FAILURES">
...
</batch:step>
</batch:process-records>
</batch:job>