Apache Camel SQL 批量插入需要很长时间
Apache Camel SQL Batch insertion taking long time
我正在使用 Apache Camel SQL 批量插入过程。
我的应用程序正在从 Active MQ 读取票证,其中包含大约 2000 张票证。
我已经更新批次为100
我正在触发的查询如下:
sql.subs.insertCdr=
insert into subscription_logs(master_id,request_type,req_desc,msisdn,amount,status,resp_code,resp_desc,channel,transaction_id,se_mode,be_mode,sub_type,sub_timeleft,srv_name,srv_id,start_date,end_date,operator,circle,country,time_offset,retry_count,user_status,previous_state,se_reqrecvtime,se_respsenttime,be_reqsenttime,be_resprecvtime,cp_id,cp_name,sub_srvname,sub_srvid,msg_senderid,msg_text,call_back_url,call_back_resp,client_ip,se_sysIp,language,cp_callbackurlhittime,action,alert,notification_url,notification_resp)
values(:#masterId, :#requestType,:#reqDesc,:#msisdnCdr,:#price,:#status,:#responseCode,:#reason,:#channel,:#transactionId,:#seMode,:#beMode,:#subType,:#subTimeLeft,:#serviceName,:#serviceId,:#subStartDate,:#cdrEndDate,:#operator,:#circle,:#country,:#timeOffset,:#retryCount,:#userStatus,:#previousState,:#seReqRecvTime,:#seRespSentTime,:#beReqSentTime,:#beRespRecvTime,:#cpId,:#cpName,:#subServiceName,:#subServiceId,:#shortCode,:#message,:#callBackUrl,:#callBackResp,:#clientIp,:#seSysIp,:#language,:#cpCallbackUrlHitTime,:#action,:#alert,:#notificationUrl,:#notificationResponse)
SQL批量路由定义如下:
<pipeline>
<log message="Going to insert in database"></log>
<transform>
<method ref="insertionBean" method="subsBatchInsertion"></method>
</transform>
<choice>
<when>
<simple>${in.header.subsCount} == ${properties:batch.size}</simple>
<to uri="sql:{{sql.subs.insertCdr}}?batch=true"></to>
<log message="Inserted rows ${body}"></log>
</when>
</choice>
</pipeline>
下面是我的java代码:
public List<Map<String, Object>> subsBatchInsertion(Exchange exchange) {
if (subsBatchCounter > batchSize) {
subsPayLoad.clear();
subsBatchCounter = 1;
}
subsPayLoad.add(generateInsert(exchange.getIn().getBody(SubscriptionCdr.class)));
exchange.getIn().setHeader("subsCount", subsBatchCounter);
subsBatchCounter++;
return subsPayLoad;
}
public Map<String, Object> generateInsert(Cdr cdr) {
Map<String, Object> insert = new HashMap<String, Object>();
try {
insert = BeanUtils.describe(cdr);
} catch (Exception e) {
Logger.sysLog(LogValues.error, this.getClass().getName()+" | "+Thread.currentThread().getStackTrace()[1].getMethodName(), coreException.GetStack(e));
}
for (String name : insert.keySet()) {
Logger.sysLog(LogValues.APP_DEBUG, this.getClass().getName(), name + ":"+ insert.get(name) + "\t");
}
return insert;
}
现在的问题是当 ActiveMQ 中有大约 120 个票时,SQL 批处理应该已经开始将值插入到数据库中。但这需要更多时间。当 ActiveMQ 中有大约 500 张票时,它开始插入过程。
有人可以帮助优化插入过程吗?
或者其他方法?
问题出在 ActiceMQ 消费者数量上。
When i changed the consumer count back to 1, the batch getting updated
on time.
其实在consumer count为10的时候,ticket是并行消费的。这意味着,对于 10 个消费者从 activemq 消费的 100 张票,每个消费者大约有 10 张票,从而增加了更多时间。当任何一位消费者获得 100 张票时,该批次正在更新。
So changing the consumer count to 1 made all the tickets to be
processed by single consumer, and thus performing the batch update
fine.
我正在使用 Apache Camel SQL 批量插入过程。
我的应用程序正在从 Active MQ 读取票证,其中包含大约 2000 张票证。
我已经更新批次为100
我正在触发的查询如下:
sql.subs.insertCdr= insert into subscription_logs(master_id,request_type,req_desc,msisdn,amount,status,resp_code,resp_desc,channel,transaction_id,se_mode,be_mode,sub_type,sub_timeleft,srv_name,srv_id,start_date,end_date,operator,circle,country,time_offset,retry_count,user_status,previous_state,se_reqrecvtime,se_respsenttime,be_reqsenttime,be_resprecvtime,cp_id,cp_name,sub_srvname,sub_srvid,msg_senderid,msg_text,call_back_url,call_back_resp,client_ip,se_sysIp,language,cp_callbackurlhittime,action,alert,notification_url,notification_resp) values(:#masterId, :#requestType,:#reqDesc,:#msisdnCdr,:#price,:#status,:#responseCode,:#reason,:#channel,:#transactionId,:#seMode,:#beMode,:#subType,:#subTimeLeft,:#serviceName,:#serviceId,:#subStartDate,:#cdrEndDate,:#operator,:#circle,:#country,:#timeOffset,:#retryCount,:#userStatus,:#previousState,:#seReqRecvTime,:#seRespSentTime,:#beReqSentTime,:#beRespRecvTime,:#cpId,:#cpName,:#subServiceName,:#subServiceId,:#shortCode,:#message,:#callBackUrl,:#callBackResp,:#clientIp,:#seSysIp,:#language,:#cpCallbackUrlHitTime,:#action,:#alert,:#notificationUrl,:#notificationResponse)
SQL批量路由定义如下:
<pipeline> <log message="Going to insert in database"></log> <transform> <method ref="insertionBean" method="subsBatchInsertion"></method> </transform> <choice> <when> <simple>${in.header.subsCount} == ${properties:batch.size}</simple> <to uri="sql:{{sql.subs.insertCdr}}?batch=true"></to> <log message="Inserted rows ${body}"></log> </when> </choice> </pipeline>
下面是我的java代码:
public List<Map<String, Object>> subsBatchInsertion(Exchange exchange) { if (subsBatchCounter > batchSize) { subsPayLoad.clear(); subsBatchCounter = 1; } subsPayLoad.add(generateInsert(exchange.getIn().getBody(SubscriptionCdr.class))); exchange.getIn().setHeader("subsCount", subsBatchCounter); subsBatchCounter++; return subsPayLoad; } public Map<String, Object> generateInsert(Cdr cdr) { Map<String, Object> insert = new HashMap<String, Object>(); try { insert = BeanUtils.describe(cdr); } catch (Exception e) { Logger.sysLog(LogValues.error, this.getClass().getName()+" | "+Thread.currentThread().getStackTrace()[1].getMethodName(), coreException.GetStack(e)); } for (String name : insert.keySet()) { Logger.sysLog(LogValues.APP_DEBUG, this.getClass().getName(), name + ":"+ insert.get(name) + "\t"); } return insert; }
现在的问题是当 ActiveMQ 中有大约 120 个票时,SQL 批处理应该已经开始将值插入到数据库中。但这需要更多时间。当 ActiveMQ 中有大约 500 张票时,它开始插入过程。 有人可以帮助优化插入过程吗? 或者其他方法?
问题出在 ActiceMQ 消费者数量上。
When i changed the consumer count back to 1, the batch getting updated on time.
其实在consumer count为10的时候,ticket是并行消费的。这意味着,对于 10 个消费者从 activemq 消费的 100 张票,每个消费者大约有 10 张票,从而增加了更多时间。当任何一位消费者获得 100 张票时,该批次正在更新。
So changing the consumer count to 1 made all the tickets to be processed by single consumer, and thus performing the batch update fine.