为重复发生的事件安排提醒
Schedule reminder for recurring event
我正在使用一个 Web 应用程序,它允许用户在日历上创建事件(一次性或重复发生),并且在事件开始前不久,系统将通知其参与者。我在设计此类通知的流程时遇到了问题,尤其是针对重复发生的事件。
需要考虑的事项:
- Web 应用程序的体系结构使得有许多相同结构的数据库,每个数据库都保留自己的一组用户和事件。因此,对一个数据库的任何查询都需要对数千个其他数据库进行查询。
重复事件可能已排除日期(类似于 RRULE 和 EXDATE 组合)。
用户可以更新事件的时间/重复规则。
该应用程序是用 Python 编写的,并且已经将 Celery 3.1 与 Redis 代理一起使用。使用此设置的解决方案会很好,尽管任何事情都可以。根据我的发现,目前很难用 Celery 动态添加周期性任务。
我正在尝试的解决方案:
周期性任务运行每天一次,扫描每个数据库并添加任务以在适当的时间为当天重复发生的每个事件做通知。
如上生成的每一个任务,其id都临时保存在Redis中。如果用户在安排通知任务后更改当天的活动时间,则该任务将被撤销并替换为新任务。
上述解决方案的示例代码:
在tasks.py
,所有任务到运行:
from celery.task import task as celery_task
from celery.result import AsyncResult
from datetime import datetime
# ...
@celery_task
def create_notify_task():
for account in system.query(Account):
db_session = account.get_session() # get sql alchemy session
for event in db_session.query(Event):
schedule_notify_event(account, partial_event)
@celery_task(name='notify_event_users')
def notify_event_users(account_id, event_id):
# do notification for every event participant
pass
def schedule_notify_event(account, event):
partial_event = event.get_partial_on(datetime.today())
if partial_event:
result = notify_event_users.apply_async(
args = (account.id, event.id),
eta = partial_event.start)
replace_task_id(account.id, event.id, result.id)
else:
replace_task_id(account.id, event.id, None)
def replace_task_id(account_id, event_id, result_id):
key = '{}:event'.format(account_id)
client = redis.get_client()
old_result_id = client.hget(key, event_id)
if old_result_id:
AsyncResult(old_result_id).revoke()
client.hset(key, event_id, result_id)
在event.py
:
# when a user change event's time
def update_event(event, data):
# ...
# update event
# ...
schedule_notify_event(account, event)
Celery 安装文件:
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'create-notify-every-day': {
'task': 'tasks.create_notify_task',
'schedule': crontab(minute=0, hour=0),
'args': (,)
},
}
上述的一些缺点是:
每日任务可能需要很长时间才能运行。最后处理的数据库中的事件必须等待并且可能会被遗漏。提前安排该任务(例如,第二天前 2 小时)可能会缓解这种情况,但是首先 运行 设置(或服务器重启后)有点尴尬。
必须注意不要为同一事件安排两次通知任务(例如,因为 create_notify_task 每天 运行 不止一次... ).
对此有更明智的方法吗?
相关问题:
- Efficient recurring tasks in celery?
好久没有人回答了,忘记这个问题了。无论如何,当时我采用了以下解决方案。我在这里概述一下,以防有人感兴趣。
- 创建事件后,任务会安排在 运行 下一次发生前不久(即下一次通知时间)。计划时间是根据应用的所有重复和例外规则计算的,因此它只是一个简单的 celery 计划一次性任务。
- 当任务 运行s 时,它执行通知作业,并在下一个通知时间安排一个新任务(同样,考虑所有重复和例外规则)。如果没有下一个事件发生,则不会安排新任务。
- 任务的id与事件一起保存在数据库中。如果事件的时间发生变化,任务将被取消,并在新的下一个通知时间安排一个新任务。当任务 运行s 并调度一个新任务时,新任务的 id 被保存在数据库中。
我能想到的一些优缺点:
- 优点:
- 在 celery 中不需要复杂的重复规则,因为任务只安排一个 运行。
- 每个任务都相当小而且速度很快,因为它只需要关心一个事件通知。
- 缺点:
- 任何时候都有大量的celery定时任务等待执行,大概有几十万个量级。我不确定这会如何影响芹菜的性能,所以它可能是也可能不是真正的骗局。到目前为止,系统似乎 运行 很好。
我正在使用一个 Web 应用程序,它允许用户在日历上创建事件(一次性或重复发生),并且在事件开始前不久,系统将通知其参与者。我在设计此类通知的流程时遇到了问题,尤其是针对重复发生的事件。
需要考虑的事项:
- Web 应用程序的体系结构使得有许多相同结构的数据库,每个数据库都保留自己的一组用户和事件。因此,对一个数据库的任何查询都需要对数千个其他数据库进行查询。
重复事件可能已排除日期(类似于 RRULE 和 EXDATE 组合)。
用户可以更新事件的时间/重复规则。
该应用程序是用 Python 编写的,并且已经将 Celery 3.1 与 Redis 代理一起使用。使用此设置的解决方案会很好,尽管任何事情都可以。根据我的发现,目前很难用 Celery 动态添加周期性任务。
我正在尝试的解决方案:
周期性任务运行每天一次,扫描每个数据库并添加任务以在适当的时间为当天重复发生的每个事件做通知。
如上生成的每一个任务,其id都临时保存在Redis中。如果用户在安排通知任务后更改当天的活动时间,则该任务将被撤销并替换为新任务。
上述解决方案的示例代码:
在
tasks.py
,所有任务到运行:from celery.task import task as celery_task from celery.result import AsyncResult from datetime import datetime # ... @celery_task def create_notify_task(): for account in system.query(Account): db_session = account.get_session() # get sql alchemy session for event in db_session.query(Event): schedule_notify_event(account, partial_event) @celery_task(name='notify_event_users') def notify_event_users(account_id, event_id): # do notification for every event participant pass def schedule_notify_event(account, event): partial_event = event.get_partial_on(datetime.today()) if partial_event: result = notify_event_users.apply_async( args = (account.id, event.id), eta = partial_event.start) replace_task_id(account.id, event.id, result.id) else: replace_task_id(account.id, event.id, None) def replace_task_id(account_id, event_id, result_id): key = '{}:event'.format(account_id) client = redis.get_client() old_result_id = client.hget(key, event_id) if old_result_id: AsyncResult(old_result_id).revoke() client.hset(key, event_id, result_id)
在
event.py
:# when a user change event's time def update_event(event, data): # ... # update event # ... schedule_notify_event(account, event)
Celery 安装文件:
from celery.schedules import crontab CELERYBEAT_SCHEDULE = { 'create-notify-every-day': { 'task': 'tasks.create_notify_task', 'schedule': crontab(minute=0, hour=0), 'args': (,) }, }
上述的一些缺点是:
每日任务可能需要很长时间才能运行。最后处理的数据库中的事件必须等待并且可能会被遗漏。提前安排该任务(例如,第二天前 2 小时)可能会缓解这种情况,但是首先 运行 设置(或服务器重启后)有点尴尬。
必须注意不要为同一事件安排两次通知任务(例如,因为 create_notify_task 每天 运行 不止一次... ).
对此有更明智的方法吗?
相关问题:
- Efficient recurring tasks in celery?
好久没有人回答了,忘记这个问题了。无论如何,当时我采用了以下解决方案。我在这里概述一下,以防有人感兴趣。
- 创建事件后,任务会安排在 运行 下一次发生前不久(即下一次通知时间)。计划时间是根据应用的所有重复和例外规则计算的,因此它只是一个简单的 celery 计划一次性任务。
- 当任务 运行s 时,它执行通知作业,并在下一个通知时间安排一个新任务(同样,考虑所有重复和例外规则)。如果没有下一个事件发生,则不会安排新任务。
- 任务的id与事件一起保存在数据库中。如果事件的时间发生变化,任务将被取消,并在新的下一个通知时间安排一个新任务。当任务 运行s 并调度一个新任务时,新任务的 id 被保存在数据库中。
我能想到的一些优缺点:
- 优点:
- 在 celery 中不需要复杂的重复规则,因为任务只安排一个 运行。
- 每个任务都相当小而且速度很快,因为它只需要关心一个事件通知。
- 缺点:
- 任何时候都有大量的celery定时任务等待执行,大概有几十万个量级。我不确定这会如何影响芹菜的性能,所以它可能是也可能不是真正的骗局。到目前为止,系统似乎 运行 很好。