组中的芹菜任务和任务已收到但未执行
Celery tasks and task in group received but not executing
我正在开发一个应用程序,旨在根据使用 selenium webdriver 创建的屏幕截图创建演示文稿。技术栈:Python 3.8.6
、Flask 1.1.2
、Celery 4.4.7
、Redis server 4.0.9
Ubuntu 18.04.3 LTS
。
当我只添加一个任务时,Celery worker 工作正常,但是当我尝试一个接一个地调用 2 或 3 个任务时,任务被接收但从未执行。添加任务的时间也会影响工人的行为。当我添加一个任务时——等待大约 2 秒然后添加第二个——所有任务都执行得很好。但是当我尝试添加 3 个任务时 - 总是有问题。有时会执行第一个任务,但其他任务不会执行,但有时 none 会执行。
这是我的代码示例:
我正在 运行宁任务 $.ajax - post:
$.ajax({
type: 'POST',
url: '/presentationTask/1234/4321',
data: {
filters_array: JSON.stringify([
{ filterTable : "Location", filterColumn: "City", filterValue: "Chicago, IL" }
])
},
success: function(data, status, request) {
status_url = request.getResponseHeader('Location');
update_progress(status_url, nanobar, div[0]);
},
error: function() {
alert('Unexpected error');
}
});
路由:
@bp.route('/presentationTask/<workspaceId>/<reportId>', methods=['POST'])
def presentationTask(workspaceId,reportId):
filters = request.form['filters_array']
task = createPresentation.apply_async(args=[workspaceId,reportId,filters])
return jsonify({}), 202, {'Location': url_for('tasks.taskstatus',
task_id= task.id)}
并且在方法 "createPresntation" 中我必须多次调用 "createScreen" 任务所以我使用 "signature" 和 "group" 到 运行 组中的任务。我正在等待所有任务完成,然后通过 "join() / join_native()"
加入他们的结果
@celery.task(bind=True)
def createPresentation(self, workspaceId, reportId, filterValues):
self.update_state(state='PENDING')
.
.
.
for filter in json_filters:
for page in json_pages["value"]:
jobList.append(createScreen.signature(args=[workspaceId, reportId, page["Name"],
filter['filterTable'], filter['filterColumn'], filter['filterValue'], currentIndex,
page["displayName"]]))
pageReportJob = group(jobList)
results = pageReportJob.apply_async()
while not results.ready():
current = results.completed_count()
self.update_state(
state='PROGRESS',
meta={'current': current, 'total': total,'status': message})
time.sleep(2)
with allow_join_result():
results.join_native()
....
我正在运行通过命令给 celery worker 设置:
celery worker -A celery_worker.celery --loglevel=info --without-gossip
--without-mingle --without-heartbeat -Ofair
我找到了一个解决方案 - 我安装了 Flower 来监视 Celery 任务,我注意到 Celery 为 2 个主要任务 (createPresentation) 分配内存并等待 (createScreen) 任务执行,但它们永远不会执行因为所有 CPUs/threads 都被任务 (createPresentation) 占用了。所以我创建了 2 个队列,一个优先级为 createScreen,一个默认优先级为 createPresentation。然后我创建芹菜路线并指定每条路线。
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
Queue('default'),
Queue('priority_high'),
)
CELERY_ROUTES = {
'app.screenshots.services.createScreen': {'queue': 'priority_high'},
'app.presentation.services.createPresentation': {'queue': 'default'},
}
我正在开发一个应用程序,旨在根据使用 selenium webdriver 创建的屏幕截图创建演示文稿。技术栈:Python 3.8.6
、Flask 1.1.2
、Celery 4.4.7
、Redis server 4.0.9
Ubuntu 18.04.3 LTS
。
当我只添加一个任务时,Celery worker 工作正常,但是当我尝试一个接一个地调用 2 或 3 个任务时,任务被接收但从未执行。添加任务的时间也会影响工人的行为。当我添加一个任务时——等待大约 2 秒然后添加第二个——所有任务都执行得很好。但是当我尝试添加 3 个任务时 - 总是有问题。有时会执行第一个任务,但其他任务不会执行,但有时 none 会执行。
这是我的代码示例:
我正在 运行宁任务 $.ajax - post:
$.ajax({
type: 'POST',
url: '/presentationTask/1234/4321',
data: {
filters_array: JSON.stringify([
{ filterTable : "Location", filterColumn: "City", filterValue: "Chicago, IL" }
])
},
success: function(data, status, request) {
status_url = request.getResponseHeader('Location');
update_progress(status_url, nanobar, div[0]);
},
error: function() {
alert('Unexpected error');
}
});
路由:
@bp.route('/presentationTask/<workspaceId>/<reportId>', methods=['POST'])
def presentationTask(workspaceId,reportId):
filters = request.form['filters_array']
task = createPresentation.apply_async(args=[workspaceId,reportId,filters])
return jsonify({}), 202, {'Location': url_for('tasks.taskstatus',
task_id= task.id)}
并且在方法 "createPresntation" 中我必须多次调用 "createScreen" 任务所以我使用 "signature" 和 "group" 到 运行 组中的任务。我正在等待所有任务完成,然后通过 "join() / join_native()"
加入他们的结果@celery.task(bind=True)
def createPresentation(self, workspaceId, reportId, filterValues):
self.update_state(state='PENDING')
.
.
.
for filter in json_filters:
for page in json_pages["value"]:
jobList.append(createScreen.signature(args=[workspaceId, reportId, page["Name"],
filter['filterTable'], filter['filterColumn'], filter['filterValue'], currentIndex,
page["displayName"]]))
pageReportJob = group(jobList)
results = pageReportJob.apply_async()
while not results.ready():
current = results.completed_count()
self.update_state(
state='PROGRESS',
meta={'current': current, 'total': total,'status': message})
time.sleep(2)
with allow_join_result():
results.join_native()
....
我正在运行通过命令给 celery worker 设置:
celery worker -A celery_worker.celery --loglevel=info --without-gossip
--without-mingle --without-heartbeat -Ofair
我找到了一个解决方案 - 我安装了 Flower 来监视 Celery 任务,我注意到 Celery 为 2 个主要任务 (createPresentation) 分配内存并等待 (createScreen) 任务执行,但它们永远不会执行因为所有 CPUs/threads 都被任务 (createPresentation) 占用了。所以我创建了 2 个队列,一个优先级为 createScreen,一个默认优先级为 createPresentation。然后我创建芹菜路线并指定每条路线。
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
Queue('default'),
Queue('priority_high'),
)
CELERY_ROUTES = {
'app.screenshots.services.createScreen': {'queue': 'priority_high'},
'app.presentation.services.createPresentation': {'queue': 'default'},
}