在芹菜组中设置任务之间的延迟
Set delay between tasks in group in Celery
我有一个 python 应用程序,用户可以在其中启动特定任务。
任务的全部目的是在给定的 URL.
的特定时间间隔内执行给定数量的 POST/GET 请求
因此用户给出 N - 请求数,V - 每秒请求数。
考虑到由于 I/O 延迟,实际 r/s 速度可能更大或更小,设计这样的任务如何更好。
首先,我决定将 Celery 与 Eventlet 一起使用,否则我将需要大量的工作,这是不可接受的。
我的幼稚做法:
- 客户端使用 task.delay()
启动任务
内部任务我做了这样的事情:
@task
def task(number_of_requests, time_period):
for _ in range(number_of_requests):
start = time.time()
params_for_concrete_subtask = ...
# .... do some IO with monkey_patched eventlet requests library
elapsed = (time.time() - start)
# If we completed this subtask to fast
if elapsed < time_period / number_of_requests:
eventlet.sleep(time_period / number_of_requests)
一个工作示例是 here。
如果我们速度太快,我们会尝试等待以保持所需的速度。如果我们太慢,从客户的角度来看是可以的。我们没有违反 requests/second 要求。但是,如果我重新启动 Celery,这会正确恢复吗?
我认为这应该可行,但我认为还有更好的方法。
在 Celery 中,我可以定义一个具有特定速率限制的任务,这几乎可以满足我的需求保证。所以我可以使用 Celery group
功能并写:
@task(rate_limit=...)
def task(...):
#
task_executor = task.s(number_of_requests, time_period)
group(task_executor(params_for_concrete_task) for params_for_concrete_task in ...).delay()
但是我在这里对 rate_limit 进行了硬编码,它是动态的,我看不到更改它的方法。我看到一个例子:
task.s(....).set(... params ...)
但我试图将 rate_limit
传递给 set
方法,但它不起作用。
另一个可能更好的想法是使用 Celery 的周期性任务调度程序。默认的执行周期和定期执行的任务是固定的。
我需要能够动态创建任务,运行 以特定的速率限制定期执行给定的次数。也许我需要 运行 我自己的调度程序,它将从数据库中获取任务?但我没有看到任何关于此的文档。
另一种方法是尝试使用 chain
函数,但我无法弄清楚任务参数之间是否存在延迟。
如果您想动态调整 rate_limit,您可以使用以下代码来完成。它还在运行时创建 chain() 。
运行 这你会看到我们成功地将 rate_limit 从 5/秒改写为 0.5/秒。
test_tasks.py
from celery import Celery, signature, chain
import datetime as dt
app = Celery('test_tasks')
app.config_from_object('celery_config')
@app.task(bind=True, rate_limit=5)
def test_1(self):
print dt.datetime.now()
app.control.broadcast('rate_limit',
arguments={'task_name': 'test_tasks.test_1',
'rate_limit': 0.5})
test_task = signature('test_tasks.test_1').set(immutable=True)
l = [test_task] * 100
chain = chain(*l)
res = chain()
我还尝试覆盖 class 中的属性,但是在 IMO 中 rate_limit 是在工作人员注册任务时设置的,这就是为什么 .set() 没有效果。我在这里推测,必须检查源代码。
解决方案 2
利用上一个调用的结束时间实现自己的等待机制,链中函数的return传递给下一个
所以它看起来像这样:
from celery import Celery, signature, chain
import datetime as dt
import time
app = Celery('test_tasks')
app.config_from_object('celery_config')
@app.task(bind=True)
def test_1(self, prev_endtime=dt.datetime.now(), wait_seconds=5):
wait = dt.timedelta(seconds=wait_seconds)
print dt.datetime.now() - prev_endtime
wait = wait - (dt.datetime.now() - prev_endtime)
wait = wait.seconds
print wait
time.sleep(max(0, wait))
now = dt.datetime.now()
print now
return now
#app.control.rate_limit('test_tasks.test_1', '0.5')
test_task = signature('test_tasks.test_1')
l = [test_task] * 100
chain = chain(*l)
res = chain()
我觉得这个其实比直播靠谱
我有一个 python 应用程序,用户可以在其中启动特定任务。
任务的全部目的是在给定的 URL.
的特定时间间隔内执行给定数量的 POST/GET 请求因此用户给出 N - 请求数,V - 每秒请求数。
考虑到由于 I/O 延迟,实际 r/s 速度可能更大或更小,设计这样的任务如何更好。
首先,我决定将 Celery 与 Eventlet 一起使用,否则我将需要大量的工作,这是不可接受的。
我的幼稚做法:
- 客户端使用 task.delay() 启动任务
内部任务我做了这样的事情:
@task def task(number_of_requests, time_period): for _ in range(number_of_requests): start = time.time() params_for_concrete_subtask = ... # .... do some IO with monkey_patched eventlet requests library elapsed = (time.time() - start) # If we completed this subtask to fast if elapsed < time_period / number_of_requests: eventlet.sleep(time_period / number_of_requests)
一个工作示例是 here。
如果我们速度太快,我们会尝试等待以保持所需的速度。如果我们太慢,从客户的角度来看是可以的。我们没有违反 requests/second 要求。但是,如果我重新启动 Celery,这会正确恢复吗?
我认为这应该可行,但我认为还有更好的方法。
在 Celery 中,我可以定义一个具有特定速率限制的任务,这几乎可以满足我的需求保证。所以我可以使用 Celery group
功能并写:
@task(rate_limit=...)
def task(...):
#
task_executor = task.s(number_of_requests, time_period)
group(task_executor(params_for_concrete_task) for params_for_concrete_task in ...).delay()
但是我在这里对 rate_limit 进行了硬编码,它是动态的,我看不到更改它的方法。我看到一个例子:
task.s(....).set(... params ...)
但我试图将 rate_limit
传递给 set
方法,但它不起作用。
另一个可能更好的想法是使用 Celery 的周期性任务调度程序。默认的执行周期和定期执行的任务是固定的。
我需要能够动态创建任务,运行 以特定的速率限制定期执行给定的次数。也许我需要 运行 我自己的调度程序,它将从数据库中获取任务?但我没有看到任何关于此的文档。
另一种方法是尝试使用 chain
函数,但我无法弄清楚任务参数之间是否存在延迟。
如果您想动态调整 rate_limit,您可以使用以下代码来完成。它还在运行时创建 chain() 。 运行 这你会看到我们成功地将 rate_limit 从 5/秒改写为 0.5/秒。
test_tasks.py
from celery import Celery, signature, chain
import datetime as dt
app = Celery('test_tasks')
app.config_from_object('celery_config')
@app.task(bind=True, rate_limit=5)
def test_1(self):
print dt.datetime.now()
app.control.broadcast('rate_limit',
arguments={'task_name': 'test_tasks.test_1',
'rate_limit': 0.5})
test_task = signature('test_tasks.test_1').set(immutable=True)
l = [test_task] * 100
chain = chain(*l)
res = chain()
我还尝试覆盖 class 中的属性,但是在 IMO 中 rate_limit 是在工作人员注册任务时设置的,这就是为什么 .set() 没有效果。我在这里推测,必须检查源代码。
解决方案 2
利用上一个调用的结束时间实现自己的等待机制,链中函数的return传递给下一个
所以它看起来像这样:
from celery import Celery, signature, chain
import datetime as dt
import time
app = Celery('test_tasks')
app.config_from_object('celery_config')
@app.task(bind=True)
def test_1(self, prev_endtime=dt.datetime.now(), wait_seconds=5):
wait = dt.timedelta(seconds=wait_seconds)
print dt.datetime.now() - prev_endtime
wait = wait - (dt.datetime.now() - prev_endtime)
wait = wait.seconds
print wait
time.sleep(max(0, wait))
now = dt.datetime.now()
print now
return now
#app.control.rate_limit('test_tasks.test_1', '0.5')
test_task = signature('test_tasks.test_1')
l = [test_task] * 100
chain = chain(*l)
res = chain()
我觉得这个其实比直播靠谱