Celery - 一个任务使用不同的参数调用一个较小的任务 n 次
Celery - One task calls a smaller task n times with different parameters
这里的基本思想是我希望一个 celery 任务调用一个方法 n 次,然后将许多较小的任务分组,每次任务参数的值都不同。
第一个任务将是一个 for 循环,每次调用一个方法,这又会 group 一些较小的任务。第一个任务将使用 for 循环的进度更新网页。
执行此操作的最佳方法是什么?
我已经尝试了几种方法,包括简单地延迟任务,但是我发现其中一个工人被锁定在第一个任务中,然后分配给它的任何较小的任务都没有。没有得到处理。
我看不到它与 chains 一起工作。
我目前使用 -Ofair 标志来禁用预取,但是这使得它在获取结果时非常慢。
celeryTasks.py
@app.task()
def sweepSubnets(subnets):
...
for subnet in subnets:
print "subnet:{0}".format(subnet)
sweep.runSweep(subnet, finished)
finished += 1
percent = (float((float(finished)/ float(noSubnets))) * 100)
current_task.update_state(state='PROGRESS',
meta={'process_percent': percent, 'subnet' : subnet})
results = sweep.getResults()
return results
@app.task()
def ping(ip):
result = os.system("ping -W 1 -c 1 " + ip + " >/dev/null")
return (ip,result)
sweep.py
def runSweep(self, subnet, number):
if self.checkSubnet(subnet):
print "Pinging {0}".format(subnet)
netAdd, nodeAdd, poolSize = self.splitSubnet(subnet)
pingResults = self.queuePings(netAdd, nodeAdd, poolSize)
activeResults = self.getActiveResults(pingResults)
# Adds a tuple to the results list (subnet, active hosts, total hosts)
self.results.append({"subnet":subnet, "activeNo":len(activeResults), "totalNo":len(pingResults), "active":activeResults, "total":pingResults, "number":number})
else:
self.results.append({"subnet":subnet, "activeNo":0, "totalNo":0, "active":[], "total":[], "number":number})
def queuePings(self, netAdd, nodeAdd, poolSize):
from celeryTasks import ping
ipToPing = []
# For each address, puts the address on the job queue
for i in range(1, poolSize):
# Checks if the node address is over 254 and increases the network address if so
nodeAdd = int(nodeAdd) + 1
if int(nodeAdd) > 255:
nodeAdd = 0
netAdd = netAdd.split(".")
netAdd = netAdd[0] + "." + netAdd[1] + "." + str(int(netAdd[2]) + 1)
ipToPing.append("{0}.{1}".format(netAdd, nodeAdd))
job = group(ping.s(ip) for ip in ipToPing)
result = job.apply_async()
print "Getting Results"
return result.get()
对于可能遇到类似问题的任何人,这就是我所做的:
我像以前一样使用一个组来 运行 所有 ping,但我保存了组的 ID,以便我可以 return 它并稍后恢复结果,而不是将它们放在更大的任务。
job = group(ping.s(ip) for ip in ipToPing)
result = job.apply_async()
while not result.ready():
time.sleep(0.1)
result.save()
return result.id
然后我有一个组 ID 的列表,我在完成更大的任务后恢复并处理这些 ID。
for job in jobList:
jobIDs = GroupResult.restore(job)
results = jobIDs.get()
这里的基本思想是我希望一个 celery 任务调用一个方法 n 次,然后将许多较小的任务分组,每次任务参数的值都不同。
第一个任务将是一个 for 循环,每次调用一个方法,这又会 group 一些较小的任务。第一个任务将使用 for 循环的进度更新网页。
执行此操作的最佳方法是什么?
我已经尝试了几种方法,包括简单地延迟任务,但是我发现其中一个工人被锁定在第一个任务中,然后分配给它的任何较小的任务都没有。没有得到处理。
我看不到它与 chains 一起工作。
我目前使用 -Ofair 标志来禁用预取,但是这使得它在获取结果时非常慢。
celeryTasks.py
@app.task()
def sweepSubnets(subnets):
...
for subnet in subnets:
print "subnet:{0}".format(subnet)
sweep.runSweep(subnet, finished)
finished += 1
percent = (float((float(finished)/ float(noSubnets))) * 100)
current_task.update_state(state='PROGRESS',
meta={'process_percent': percent, 'subnet' : subnet})
results = sweep.getResults()
return results
@app.task()
def ping(ip):
result = os.system("ping -W 1 -c 1 " + ip + " >/dev/null")
return (ip,result)
sweep.py
def runSweep(self, subnet, number):
if self.checkSubnet(subnet):
print "Pinging {0}".format(subnet)
netAdd, nodeAdd, poolSize = self.splitSubnet(subnet)
pingResults = self.queuePings(netAdd, nodeAdd, poolSize)
activeResults = self.getActiveResults(pingResults)
# Adds a tuple to the results list (subnet, active hosts, total hosts)
self.results.append({"subnet":subnet, "activeNo":len(activeResults), "totalNo":len(pingResults), "active":activeResults, "total":pingResults, "number":number})
else:
self.results.append({"subnet":subnet, "activeNo":0, "totalNo":0, "active":[], "total":[], "number":number})
def queuePings(self, netAdd, nodeAdd, poolSize):
from celeryTasks import ping
ipToPing = []
# For each address, puts the address on the job queue
for i in range(1, poolSize):
# Checks if the node address is over 254 and increases the network address if so
nodeAdd = int(nodeAdd) + 1
if int(nodeAdd) > 255:
nodeAdd = 0
netAdd = netAdd.split(".")
netAdd = netAdd[0] + "." + netAdd[1] + "." + str(int(netAdd[2]) + 1)
ipToPing.append("{0}.{1}".format(netAdd, nodeAdd))
job = group(ping.s(ip) for ip in ipToPing)
result = job.apply_async()
print "Getting Results"
return result.get()
对于可能遇到类似问题的任何人,这就是我所做的:
我像以前一样使用一个组来 运行 所有 ping,但我保存了组的 ID,以便我可以 return 它并稍后恢复结果,而不是将它们放在更大的任务。
job = group(ping.s(ip) for ip in ipToPing)
result = job.apply_async()
while not result.ready():
time.sleep(0.1)
result.save()
return result.id
然后我有一个组 ID 的列表,我在完成更大的任务后恢复并处理这些 ID。
for job in jobList:
jobIDs = GroupResult.restore(job)
results = jobIDs.get()