队列前面的队列
A queue ahead of the queue
我正在使用 ColdFusion 网关触发并忘记大量操作。为此,我有一个循环,它通过一个末尾带有 SendGatewayMessage()
的查询。但是,我循环执行的查询可能会变得非常大。 (超过 100.000 条记录)
为了防止操作丢失,我增加了队列大小和线程数。
因为操作仍然丢失,所以我在 SendGatewayMessage()
之前添加了一个循环,如下所示:
<cfloop condition="#gatewayService.getQueueSize()# GTE #gatewayService.getMaxQueueSize()#">
<cfset guardianCount = guardianCount+1>
</cfloop>
<cflog file="gatewayGuardian" text="#i# waited for #guardianCount# iterations. Queuesize:#gatewayService.getQueueSize()#">
<cfset SendGatewayMessage("EventGateway",eventData)>
(有关网关服务的更多信息 class here)
这或多或少是可以接受的,因为我可以将请求超时增加到几个小时(!),但我仍在寻找一种更有效的方法来减慢向队列发送消息的速度,希望如此整个过程将进行得更快,对服务器资源的压力更小。
有什么建议吗?
对进一步增加队列大小的后果有什么想法吗?
现在,我使用应用程序变量来跟踪整个作业中的记录、已处理的批次数和已处理的记录数。
在工作开始时,我有一段代码可以启动所有这些变量,如下所示:
<cfif not structKeyExists(application,"batchNumber") or application.batchNumber
eq 0 or application.batchNumber eq "">
<cfset application.batchNumber = 0>
<cfset application.recordsToDo = 0>
<cfset application.recordsDone = 0>
<cfset application.recordsDoneErrors = 0>
</cfif>
之后,我设置一个查询中的所有记录,并确定我们需要在当前批次中处理该查询中的哪些记录。
批次中的记录量由记录总量和最大队列大小决定。这样,每个批次将永远不会占据队列的一半以上。这确保该作业永远不会干扰其他操作或作业,并且初始请求不会超时。
<cfset application.recordsToSync = qryRecords.recordcount>
<cfif not structKeyExists(application,"recordsPerBatch") or application.recordsPerBatch eq "" or application.recordsPerBatch eq 0>
<cfset application.recordsPerBatch = ceiling(application.recordsToDo/(ceiling(application.recordsToDo/gatewayService.getMaxQueueSize())+1))>
</cfif>
<cfset startRow = (application.recordsPerBatch*application.batchNumber)+1>
<cfset endRow = startRow + application.recordsPerBatch-1>
<cfif endRow gt application.recordsToDo>
<cfset endRow = application.recordsToDo>
</cfif>
然后我使用 from/to 循环遍历查询以触发网关事件。我保留了监护人,所以永远不会因为队列已满而丢失记录。
<cfloop from="#startRow#" to="#endRow#" index="i">
<cfset guardianCount = 0>
<!--- load all values from the record into a struct --->
<cfset stRecordData = structNew()>
<cfloop list="#qryRecords.columnlist#" index="columnlabel">
<cfset stRecordData[columnlabel] = trim(qryRecords[columnlabel][i])>
</cfloop>
<cfset eventData = structNew()>
<cfset eventData.stData = stRecordData>
<cfset eventData.action = "bigJob">
<cfloop condition="#gatewayService.getQueueSize()# GTE #gatewayService.getMaxQueueSize()#">
<cfset guardianCount = guardianCount++>
</cfloop>
<cfset SendGatewayMessage("eventGateway",eventData)>
</cfloop>
每当记录完成时,我都有一个功能来检查完成的数量与要完成的记录数量。当它们相同时,我就完成了。否则我们可能需要开始新的一批。
请注意,检查我们是否完成是在 cflock 中,但实际事件 post 不是。这是因为当您 posted 的事件无法读取您在锁中使用的变量时,您可能会遇到死锁。
我希望这对某人有用,或者其他人还有更好的主意。
<cflock timeout="30" name="jobResult">
<cfset application.recordsDone++>
<cfif application.recordsDone eq application.recordsToDo>
<!--- We are done. Set all the application variables we used back to zero, so they do not get in the way when we start the job again --->
<cfset application.batchNumber = 0>
<cfset application.recordsToDo = 0>
<cfset application.recordsDone = 0>
<cfset application.recordsPerBatch = 0>
<cfset application.recordsDoneErrors = 0>
<cfset application.JobStarted = 0>
<!--- If the number of records we have done is the same as the number of records in a batch times the current batchnumber plus one, we are done with the batch. --->
<cfelseif application.recordsDone eq application.recordsPerBatch*(application.batchNumber+1)
and application.recordsDone neq application.recordsToDo>
<cfset application.batchNumber++>
<cfset doEventAnnounce = true>
</cfif>
</cflock>
<cfif doEventAnnounce>
<!--- Fire off the event that starts the job. All the info it needs is in the applicationscope. --->
<cfhttp url="#URURLHERE#/index.cfm" method="post">
<cfhttpparam type="url" name="event" value="startBigJob">
</cfhttp>
</cfif>
我正在使用 ColdFusion 网关触发并忘记大量操作。为此,我有一个循环,它通过一个末尾带有 SendGatewayMessage()
的查询。但是,我循环执行的查询可能会变得非常大。 (超过 100.000 条记录)
为了防止操作丢失,我增加了队列大小和线程数。
因为操作仍然丢失,所以我在 SendGatewayMessage()
之前添加了一个循环,如下所示:
<cfloop condition="#gatewayService.getQueueSize()# GTE #gatewayService.getMaxQueueSize()#">
<cfset guardianCount = guardianCount+1>
</cfloop>
<cflog file="gatewayGuardian" text="#i# waited for #guardianCount# iterations. Queuesize:#gatewayService.getQueueSize()#">
<cfset SendGatewayMessage("EventGateway",eventData)>
(有关网关服务的更多信息 class here)
这或多或少是可以接受的,因为我可以将请求超时增加到几个小时(!),但我仍在寻找一种更有效的方法来减慢向队列发送消息的速度,希望如此整个过程将进行得更快,对服务器资源的压力更小。
有什么建议吗? 对进一步增加队列大小的后果有什么想法吗?
现在,我使用应用程序变量来跟踪整个作业中的记录、已处理的批次数和已处理的记录数。 在工作开始时,我有一段代码可以启动所有这些变量,如下所示:
<cfif not structKeyExists(application,"batchNumber") or application.batchNumber
eq 0 or application.batchNumber eq "">
<cfset application.batchNumber = 0>
<cfset application.recordsToDo = 0>
<cfset application.recordsDone = 0>
<cfset application.recordsDoneErrors = 0>
</cfif>
之后,我设置一个查询中的所有记录,并确定我们需要在当前批次中处理该查询中的哪些记录。 批次中的记录量由记录总量和最大队列大小决定。这样,每个批次将永远不会占据队列的一半以上。这确保该作业永远不会干扰其他操作或作业,并且初始请求不会超时。
<cfset application.recordsToSync = qryRecords.recordcount>
<cfif not structKeyExists(application,"recordsPerBatch") or application.recordsPerBatch eq "" or application.recordsPerBatch eq 0>
<cfset application.recordsPerBatch = ceiling(application.recordsToDo/(ceiling(application.recordsToDo/gatewayService.getMaxQueueSize())+1))>
</cfif>
<cfset startRow = (application.recordsPerBatch*application.batchNumber)+1>
<cfset endRow = startRow + application.recordsPerBatch-1>
<cfif endRow gt application.recordsToDo>
<cfset endRow = application.recordsToDo>
</cfif>
然后我使用 from/to 循环遍历查询以触发网关事件。我保留了监护人,所以永远不会因为队列已满而丢失记录。
<cfloop from="#startRow#" to="#endRow#" index="i">
<cfset guardianCount = 0>
<!--- load all values from the record into a struct --->
<cfset stRecordData = structNew()>
<cfloop list="#qryRecords.columnlist#" index="columnlabel">
<cfset stRecordData[columnlabel] = trim(qryRecords[columnlabel][i])>
</cfloop>
<cfset eventData = structNew()>
<cfset eventData.stData = stRecordData>
<cfset eventData.action = "bigJob">
<cfloop condition="#gatewayService.getQueueSize()# GTE #gatewayService.getMaxQueueSize()#">
<cfset guardianCount = guardianCount++>
</cfloop>
<cfset SendGatewayMessage("eventGateway",eventData)>
</cfloop>
每当记录完成时,我都有一个功能来检查完成的数量与要完成的记录数量。当它们相同时,我就完成了。否则我们可能需要开始新的一批。 请注意,检查我们是否完成是在 cflock 中,但实际事件 post 不是。这是因为当您 posted 的事件无法读取您在锁中使用的变量时,您可能会遇到死锁。
我希望这对某人有用,或者其他人还有更好的主意。
<cflock timeout="30" name="jobResult">
<cfset application.recordsDone++>
<cfif application.recordsDone eq application.recordsToDo>
<!--- We are done. Set all the application variables we used back to zero, so they do not get in the way when we start the job again --->
<cfset application.batchNumber = 0>
<cfset application.recordsToDo = 0>
<cfset application.recordsDone = 0>
<cfset application.recordsPerBatch = 0>
<cfset application.recordsDoneErrors = 0>
<cfset application.JobStarted = 0>
<!--- If the number of records we have done is the same as the number of records in a batch times the current batchnumber plus one, we are done with the batch. --->
<cfelseif application.recordsDone eq application.recordsPerBatch*(application.batchNumber+1)
and application.recordsDone neq application.recordsToDo>
<cfset application.batchNumber++>
<cfset doEventAnnounce = true>
</cfif>
</cflock>
<cfif doEventAnnounce>
<!--- Fire off the event that starts the job. All the info it needs is in the applicationscope. --->
<cfhttp url="#URURLHERE#/index.cfm" method="post">
<cfhttpparam type="url" name="event" value="startBigJob">
</cfhttp>
</cfif>