Celery - 一个任务使用不同的参数调用一个较小的任务 n 次

Celery - One task calls a smaller task n times with different parameters

这里的基本思想是我希望一个 celery 任务调用一个方法 n 次,然后将许多较小的任务分组,每次任务参数的值都不同。

第一个任务将是一个 for 循环,每次调用一个方法,这又会 group 一些较小的任务。第一个任务将使用 for 循环的进度更新网页。

执行此操作的最佳方法是什么?

我已经尝试了几种方法,包括简单地延迟任务,但是我发现其中一个工人被锁定在第一个任务中,然后分配给它的任何较小的任务都没有。没有得到处理。

我看不到它与 chains 一起工作。

我目前使用 -Ofair 标志来禁用预取,但是这使得它在获取结果时非常慢。

celeryTasks.py

@app.task()
def sweepSubnets(subnets):
    ...
    for subnet in subnets:
        print "subnet:{0}".format(subnet)
        sweep.runSweep(subnet, finished)
        finished += 1
        percent = (float((float(finished)/ float(noSubnets))) * 100)
        current_task.update_state(state='PROGRESS',
        meta={'process_percent': percent, 'subnet' : subnet})

    results = sweep.getResults()
    return results

@app.task()
def ping(ip):
    result = os.system("ping -W 1 -c 1 " + ip + " >/dev/null")
    return (ip,result)

sweep.py

def runSweep(self, subnet, number):
    if self.checkSubnet(subnet):
         print "Pinging {0}".format(subnet)
         netAdd, nodeAdd, poolSize = self.splitSubnet(subnet) 
         pingResults = self.queuePings(netAdd, nodeAdd, poolSize)
         activeResults = self.getActiveResults(pingResults)

        # Adds a tuple to the results list (subnet, active hosts, total hosts)
        self.results.append({"subnet":subnet, "activeNo":len(activeResults), "totalNo":len(pingResults), "active":activeResults, "total":pingResults, "number":number})
    else:
        self.results.append({"subnet":subnet, "activeNo":0, "totalNo":0, "active":[], "total":[], "number":number})

def queuePings(self, netAdd, nodeAdd, poolSize):
    from celeryTasks import ping

    ipToPing = []

    # For each address, puts the address on the job queue
    for i in range(1, poolSize):
        # Checks if the node address is over 254 and increases the network address if so
        nodeAdd = int(nodeAdd) + 1
        if int(nodeAdd) > 255:
            nodeAdd = 0
            netAdd = netAdd.split(".")
            netAdd = netAdd[0] + "." + netAdd[1] + "." + str(int(netAdd[2]) + 1)

        ipToPing.append("{0}.{1}".format(netAdd, nodeAdd))

    job = group(ping.s(ip) for ip in ipToPing)

    result = job.apply_async()
    print "Getting Results"
    return result.get()

对于可能遇到类似问题的任何人,这就是我所做的:

我像以前一样使用一个组来 运行 所有 ping,但我保存了组的 ID,以便我可以 return 它并稍后恢复结果,而不是将它们放在更大的任务。

job = group(ping.s(ip) for ip in ipToPing)
result = job.apply_async()
while not result.ready():
    time.sleep(0.1)
result.save()
return result.id

然后我有一个组 ID 的列表,我在完成更大的任务后恢复并处理这些 ID。

for job in jobList:
     jobIDs = GroupResult.restore(job)
     results = jobIDs.get()