队列前面的队列

A queue ahead of the queue

我正在使用 ColdFusion 网关触发并忘记大量操作。为此,我有一个循环,它通过一个末尾带有 SendGatewayMessage() 的查询。但是,我循环执行的查询可能会变得非常大。 (超过 100.000 条记录)

为了防止操作丢失,我增加了队列大小和线程数。

因为操作仍然丢失,所以我在 SendGatewayMessage() 之前添加了一个循环,如下所示:

<cfloop condition="#gatewayService.getQueueSize()# GTE #gatewayService.getMaxQueueSize()#">
    <cfset guardianCount = guardianCount+1>
</cfloop>
<cflog file="gatewayGuardian" text="#i# waited for #guardianCount# iterations. Queuesize:#gatewayService.getQueueSize()#">
<cfset SendGatewayMessage("EventGateway",eventData)>

(有关网关服务的更多信息 class here

这或多或少是可以接受的,因为我可以将请求超时增加到几个小时(!),但我仍在寻找一种更有效的方法来减慢向队列发送消息的速度,希望如此整个过程将进行得更快,对服务器资源的压力更小。

有什么建议吗? 对进一步增加队列大小的后果有什么想法吗?

现在,我使用应用程序变量来跟踪整个作业中的记录、已处理的批次数和已处理的记录数。 在工作开始时,我有一段代码可以启动所有这些变量,如下所示:

<cfif not structKeyExists(application,"batchNumber") or application.batchNumber
 eq 0 or application.batchNumber eq "">
    <cfset application.batchNumber = 0>
    <cfset application.recordsToDo = 0>
    <cfset application.recordsDone = 0>
    <cfset application.recordsDoneErrors = 0>
</cfif>

之后,我设置一个查询中的所有记录,并确定我们需要在当前批次中处理该查询中的哪些记录。 批次中的记录量由记录总量和最大队列大小决定。这样,每个批次将永远不会占据队列的一半以上。这确保该作业永远不会干扰其他操作或作业,并且初始请求不会超时。

<cfset application.recordsToSync = qryRecords.recordcount>
<cfif not structKeyExists(application,"recordsPerBatch") or application.recordsPerBatch eq "" or application.recordsPerBatch eq 0>
    <cfset application.recordsPerBatch = ceiling(application.recordsToDo/(ceiling(application.recordsToDo/gatewayService.getMaxQueueSize())+1))>
</cfif>
<cfset startRow = (application.recordsPerBatch*application.batchNumber)+1>
<cfset endRow = startRow + application.recordsPerBatch-1>
<cfif endRow gt application.recordsToDo>
    <cfset endRow = application.recordsToDo>
</cfif>

然后我使用 from/to 循环遍历查询以触发网关事件。我保留了监护人,所以永远不会因为队列已满而丢失记录。

<cfloop from="#startRow#" to="#endRow#" index="i">
    <cfset guardianCount = 0>
    <!--- load all values from the record into a struct --->
    <cfset stRecordData = structNew()>
    <cfloop list="#qryRecords.columnlist#" index="columnlabel">
        <cfset stRecordData[columnlabel] = trim(qryRecords[columnlabel][i])>
    </cfloop>
    <cfset eventData = structNew()>
    <cfset eventData.stData = stRecordData>
    <cfset eventData.action = "bigJob">
    <cfloop condition="#gatewayService.getQueueSize()# GTE #gatewayService.getMaxQueueSize()#">
        <cfset guardianCount = guardianCount++>
    </cfloop>
    <cfset SendGatewayMessage("eventGateway",eventData)>
</cfloop>

每当记录完成时,我都有一个功能来检查完成的数量与要完成的记录数量。当它们相同时,我就完成了。否则我们可能需要开始新的一批。 请注意,检查我们是否完成是在 cflock 中,但实际事件 post 不是。这是因为当您 posted 的事件无法读取您在锁中使用的变量时,您可能会遇到死锁。

我希望这对某人有用,或者其他人还有更好的主意。

<cflock timeout="30" name="jobResult">
    <cfset application.recordsDone++>
    <cfif application.recordsDone eq application.recordsToDo> 
        <!--- We are done. Set all the application variables we used back to zero, so they do not get in the way when we start the job again --->
        <cfset application.batchNumber = 0>
        <cfset application.recordsToDo = 0>
        <cfset application.recordsDone = 0>
        <cfset application.recordsPerBatch = 0>
        <cfset application.recordsDoneErrors = 0>
        <cfset application.JobStarted = 0>
        <!--- If the number of records we have done is the same as the number of records in a batch times the current batchnumber plus one, we are done with the batch. --->
    <cfelseif application.recordsDone eq application.recordsPerBatch*(application.batchNumber+1)
        and application.recordsDone neq application.recordsToDo>
        <cfset application.batchNumber++>
        <cfset doEventAnnounce = true>
    </cfif>
</cflock>
<cfif doEventAnnounce>
<!--- Fire off the event that starts the job. All the info it needs is in the applicationscope. --->
    <cfhttp url="#URURLHERE#/index.cfm" method="post">
        <cfhttpparam type="url" name="event" value="startBigJob">
    </cfhttp>
</cfif>