使用 ETA/Countdown 实现任务队列(如 Celery)
Implement a TaskQueue (like Celery) with ETA/Countdown
许多流行的任务队列(如GoogleGAE TaskQueue、Celery)都有ETA/Countdown特性,允许在xxx秒后将任务放入队列。
我正在处理一个需要具有 ETA 功能的任务队列的项目。但是,我必须使用 Google Pubsub 消息传递系统有一些限制。 Pubsub 没有 ETA 功能。我想知道如何为任务队列实现可靠且可扩展的 ETA 机制。欢迎一般架构想法和实际代码示例。
我们的系统排队 600-2000 tasks/second,其中大约 10% 需要有 ETA。它是一个分布式系统并且对性能至关重要。
我试图追踪 celery 的来源 code,但找不到处理 ETA 的实际逻辑。如果有人可以将我指向处理 ETA 的 Celery file/code,那也很好。
我想我可能已经找到了 Celery 是如何做到的。在 eventlet.py 中,它使用 eventlet 的 spawn_after 功能来延迟 worker 创建 "ETA" 秒。
secs = max(eta - monotonic(), 0)
g = self._spawn_after(secs, entry)
许多流行的任务队列(如GoogleGAE TaskQueue、Celery)都有ETA/Countdown特性,允许在xxx秒后将任务放入队列。
我正在处理一个需要具有 ETA 功能的任务队列的项目。但是,我必须使用 Google Pubsub 消息传递系统有一些限制。 Pubsub 没有 ETA 功能。我想知道如何为任务队列实现可靠且可扩展的 ETA 机制。欢迎一般架构想法和实际代码示例。
我们的系统排队 600-2000 tasks/second,其中大约 10% 需要有 ETA。它是一个分布式系统并且对性能至关重要。
我试图追踪 celery 的来源 code,但找不到处理 ETA 的实际逻辑。如果有人可以将我指向处理 ETA 的 Celery file/code,那也很好。
我想我可能已经找到了 Celery 是如何做到的。在 eventlet.py 中,它使用 eventlet 的 spawn_after 功能来延迟 worker 创建 "ETA" 秒。
secs = max(eta - monotonic(), 0)
g = self._spawn_after(secs, entry)