从任务失败/uuid 中检索 Celery 任务 kwargs
Retrieving Celery task kwargs from task-failed / uuid
主要问题
我正在测试如何处理某些任务失败,例如处理 'TimeLimitExceeded' 异常,它会立即终止任务而不是 'catchable'(是的......我知道'SoftTimeLimit' 的存在,但它不符合我的需要)。
第一种方法
这是我的 tasks.py
(工人 运行 带有 --time-limit
标志):
import logging
from celery import Celery
import time
app = Celery('tasks', broker='pyamqp://guest@localhost//')
def my_fail(task, exc, req_id, req_args, req_kwargs, einfo, *ext_args, **kwargs):
logger.info("args: %r", req_args)
logger.info("kw: %r", req_kwargs)
@app.task(on_failure=my_fail)
def sum(x, y, delay=0, **kw):
result = x+y
if result == 4:
raise Exception("Some Error")
time.sleep(delay)
return x+y
主要思想是当任务失败时,能够根据任务的args/kwargs进行一些处理
例如,如果我 运行 sum.delay(3, 1, foo="bar")
引发 Exception("Some Error")
并记录以下内容:
[2019-06-30 17:21:45,120: INFO/Worker-1] args: (3, 1)
[2019-06-30 17:21:45,121: INFO/Worker-1] kw: {'foo': 'bar'}
[2019-06-30 17:21:45,122: ERROR/MainProcess] Task tasks.sum[9e9de032-1469-44e7-8932-4c490fcee2e3] raised unexpected: Exception('Some Error',)
Traceback (most recent call last):
File "/home/apernin/.virtualenvs/dr/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/home/apernin/.virtualenvs/dr/local/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/home/apernin/test/tasks.py", line 89, in sum
raise Exception("Some Error")
Exception: Some Error
请注意 args/kwargs 是由我的 on-failure
处理程序打印的。
现在如果我 运行 sum.delay(3, 2, delay=7)
TimeLimit 被触发
[2019-06-30 17:23:15,244: INFO/MainProcess] Received task: tasks.sum[8c81398b-4378-401d-a674-a3bd3418ccde]
[2019-06-30 17:23:21,070: ERROR/MainProcess] Task tasks.sum[8c81398b-4378-401d-a674-a3bd3418ccde] raised unexpected: TimeLimitExceeded(5.0,)
Traceback (most recent call last):
File "/home/apernin/.virtualenvs/dr/local/lib/python2.7/site-packages/billiard/pool.py", line 645, in on_hard_timeout
raise TimeLimitExceeded(job._timeout)
TimeLimitExceeded: TimeLimitExceeded(5.0,)
[2019-06-30 17:23:21,071: ERROR/MainProcess] Hard time limit (5.0s) exceeded for tasks.sum[8c81398b-4378-401d-a674-a3bd3418ccde]
[2019-06-30 17:23:21,629: ERROR/MainProcess] Process 'Worker-1' pid:15472 exited with 'signal 15 (SIGTERM)'
请注意 args/kwargs 是注释打印的,因为 on-failure
处理程序未被执行。由于 Celery 的硬时间限制的性质,这在某种程度上是可以预料的。
第二种方法
我的第二种方法是使用事件侦听器。
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
我能够检索到的唯一信息是任务 uuid,我无法检索到任务的名称、args 或 kwargs(任务对象包含属性,但都是 None).
问题
有没有办法:
- 在硬时间限制的情况下制作
on_failure
处理程序?
- 检索具有
task-failed
事件侦听器的任务的任务 args/kwargs?
提前致谢
首先,超时由 Worker(MainProcess)处理,它与任务内部发生的故障不同,例如抛出异常等。这就是为什么您将其视为 TimeLimitExceeded 引发的原因日志中的 MainProcess。所以,不幸的是你不能依赖相同的逻辑...
但是,您的第二种方法将证明有助于追踪正在发生的事情。
我开发了 (in-house) 一个 Celery 监控工具,它可以抓取所有事件,并将它们填充到数据库中,以便稍后我们可以进行各种分析(参见平均值和最差 运行 次示例,失败频率等)。
为了从 task-failed
事件提供的数据中获取您需要的详细信息,您还需要记录(例如将其存储在某个字典中)task-received
事件数据。此信息包含参数、任务名称和您可能需要的所有有用信息。您通过任务 UUID 将它们关联起来。
主要问题
我正在测试如何处理某些任务失败,例如处理 'TimeLimitExceeded' 异常,它会立即终止任务而不是 'catchable'(是的......我知道'SoftTimeLimit' 的存在,但它不符合我的需要)。
第一种方法
这是我的 tasks.py
(工人 运行 带有 --time-limit
标志):
import logging
from celery import Celery
import time
app = Celery('tasks', broker='pyamqp://guest@localhost//')
def my_fail(task, exc, req_id, req_args, req_kwargs, einfo, *ext_args, **kwargs):
logger.info("args: %r", req_args)
logger.info("kw: %r", req_kwargs)
@app.task(on_failure=my_fail)
def sum(x, y, delay=0, **kw):
result = x+y
if result == 4:
raise Exception("Some Error")
time.sleep(delay)
return x+y
主要思想是当任务失败时,能够根据任务的args/kwargs进行一些处理
例如,如果我 运行 sum.delay(3, 1, foo="bar")
引发 Exception("Some Error")
并记录以下内容:
[2019-06-30 17:21:45,120: INFO/Worker-1] args: (3, 1)
[2019-06-30 17:21:45,121: INFO/Worker-1] kw: {'foo': 'bar'}
[2019-06-30 17:21:45,122: ERROR/MainProcess] Task tasks.sum[9e9de032-1469-44e7-8932-4c490fcee2e3] raised unexpected: Exception('Some Error',)
Traceback (most recent call last):
File "/home/apernin/.virtualenvs/dr/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/home/apernin/.virtualenvs/dr/local/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/home/apernin/test/tasks.py", line 89, in sum
raise Exception("Some Error")
Exception: Some Error
请注意 args/kwargs 是由我的 on-failure
处理程序打印的。
现在如果我 运行 sum.delay(3, 2, delay=7)
TimeLimit 被触发
[2019-06-30 17:23:15,244: INFO/MainProcess] Received task: tasks.sum[8c81398b-4378-401d-a674-a3bd3418ccde]
[2019-06-30 17:23:21,070: ERROR/MainProcess] Task tasks.sum[8c81398b-4378-401d-a674-a3bd3418ccde] raised unexpected: TimeLimitExceeded(5.0,)
Traceback (most recent call last):
File "/home/apernin/.virtualenvs/dr/local/lib/python2.7/site-packages/billiard/pool.py", line 645, in on_hard_timeout
raise TimeLimitExceeded(job._timeout)
TimeLimitExceeded: TimeLimitExceeded(5.0,)
[2019-06-30 17:23:21,071: ERROR/MainProcess] Hard time limit (5.0s) exceeded for tasks.sum[8c81398b-4378-401d-a674-a3bd3418ccde]
[2019-06-30 17:23:21,629: ERROR/MainProcess] Process 'Worker-1' pid:15472 exited with 'signal 15 (SIGTERM)'
请注意 args/kwargs 是注释打印的,因为 on-failure
处理程序未被执行。由于 Celery 的硬时间限制的性质,这在某种程度上是可以预料的。
第二种方法
我的第二种方法是使用事件侦听器。
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
我能够检索到的唯一信息是任务 uuid,我无法检索到任务的名称、args 或 kwargs(任务对象包含属性,但都是 None).
问题
有没有办法:
- 在硬时间限制的情况下制作
on_failure
处理程序? - 检索具有
task-failed
事件侦听器的任务的任务 args/kwargs?
提前致谢
首先,超时由 Worker(MainProcess)处理,它与任务内部发生的故障不同,例如抛出异常等。这就是为什么您将其视为 TimeLimitExceeded 引发的原因日志中的 MainProcess。所以,不幸的是你不能依赖相同的逻辑...
但是,您的第二种方法将证明有助于追踪正在发生的事情。
我开发了 (in-house) 一个 Celery 监控工具,它可以抓取所有事件,并将它们填充到数据库中,以便稍后我们可以进行各种分析(参见平均值和最差 运行 次示例,失败频率等)。
为了从 task-failed
事件提供的数据中获取您需要的详细信息,您还需要记录(例如将其存储在某个字典中)task-received
事件数据。此信息包含参数、任务名称和您可能需要的所有有用信息。您通过任务 UUID 将它们关联起来。