Flask Celery 任务锁定
Flask Celery task locking
我正在将 Flask 与 Celery 一起使用,我正在尝试锁定特定任务,以便一次只能 运行 一个任务。在芹菜文档中,它给出了一个这样做的例子 Celery docs, Ensuring a task is only executed one at a time。给出的这个示例是针对 Django 的,但是我正在使用 Flask 我已尽力将其转换为与 Flask 一起使用,但是我仍然看到具有锁的 myTask1 可以 运行 多次。
我不清楚的一件事是,如果我正确使用缓存,我以前从未使用过它,所以所有这些对我来说都是新的。文档中提到但未解释的一件事是
文档注释:
In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.
我不太确定那是什么意思,我应该将缓存与数据库结合使用吗?如果是,我该怎么做?我正在使用 mongodb。在我的代码中,我只是为缓存 cache = Cache(app, config={'CACHE_TYPE': 'simple'})
设置了这个设置,因为这就是 Flask-Cache 文档 Flask-Cache Docs
中提到的内容
我不清楚的另一件事是,当我从我的 Flask 路由 task1
中调用 myTask1
时,我是否需要做任何不同的事情
这是我正在使用的代码示例。
from flask import (Flask, render_template, flash, redirect,
url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time
app = Flask(__name__)
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'
######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'
mongo = PyMongo(app)
##############################
# CELERY ARGUMENTS
##############################
app.config['CELERY_BROKER_URL'] = 'amqp://localhost//'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
"host": "localhost",
"port": 27017,
"database": "celery-test-db",
"taskmeta_collection": "celery_jobs",
}
app.config['CELERY_TASK_SERIALIZER'] = 'json'
celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)
LOCK_EXPIRE = 60 * 2 # Lock expires in 2 minutes
@contextmanager
def memcache_lock(lock_id, oid):
timeout_at = monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
@celery.task(bind=True, name='app.myTask1')
def myTask1(self):
self.update_state(state='IN TASK')
lock_id = self.name
with memcache_lock(lock_id, self.app.oid) as acquired:
if acquired:
# do work if we got the lock
print('acquired is {}'.format(acquired))
self.update_state(state='DOING WORK')
time.sleep(90)
return 'result'
# otherwise, the lock was already in use
raise self.retry(countdown=60) # redeliver message to the queue, so the work can be done later
@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
print('you are in task2')
self.update_state(state='STARTING')
time.sleep(120)
print('task2 done')
@app.route('/', methods=['GET', 'POST'])
def index():
return render_template('index.html')
@app.route('/task1', methods=['GET', 'POST'])
def task1():
print('running task1')
result = myTask1.delay()
# get async task id
taskResult = AsyncResult(result.task_id)
# push async taskid into db collection job_task_id
mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task1'})
return render_template('task1.html')
@app.route('/task2', methods=['GET', 'POST'])
def task2():
print('running task2')
result = myTask2.delay()
# get async task id
taskResult = AsyncResult(result.task_id)
# push async taskid into db collection job_task_id
mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})
return render_template('task2.html')
@app.route('/status', methods=['GET', 'POST'])
def status():
taskid_list = []
task_state_list = []
TaskName_list = []
allAsyncData = mongo.db.job_task_id.find()
for doc in allAsyncData:
try:
taskid_list.append(doc['taskid'])
except:
print('error with db conneciton in asyncJobStatus')
TaskName_list.append(doc['TaskName'])
# PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
for item in taskid_list:
try:
task_state_list.append(myTask1.AsyncResult(item).state)
except:
task_state_list.append('UNKNOWN')
return render_template('status.html', data_list=zip(task_state_list, TaskName_list))
最终工作代码
from flask import (Flask, render_template, flash, redirect,
url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time
import redis
from flask_redis import FlaskRedis
app = Flask(__name__)
# ADDING REDIS
redis_store = FlaskRedis(app)
# POINTING CACHE_TYPE TO REDIS
cache = Cache(app, config={'CACHE_TYPE': 'redis'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'
######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'
mongo = PyMongo(app)
##############################
# CELERY ARGUMENTS
##############################
# CELERY USING REDIS
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
"host": "localhost",
"port": 27017,
"database": "celery-test-db",
"taskmeta_collection": "celery_jobs",
}
app.config['CELERY_TASK_SERIALIZER'] = 'json'
celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)
LOCK_EXPIRE = 60 * 2 # Lock expires in 2 minutes
@contextmanager
def memcache_lock(lock_id, oid):
timeout_at = monotonic() + LOCK_EXPIRE - 3
print('in memcache_lock and timeout_at is {}'.format(timeout_at))
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
print('memcache_lock and status is {}'.format(status))
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
@celery.task(bind=True, name='app.myTask1')
def myTask1(self):
self.update_state(state='IN TASK')
print('dir is {} '.format(dir(self)))
lock_id = self.name
print('lock_id is {}'.format(lock_id))
with memcache_lock(lock_id, self.app.oid) as acquired:
print('in memcache_lock and lock_id is {} self.app.oid is {} and acquired is {}'.format(lock_id, self.app.oid, acquired))
if acquired:
# do work if we got the lock
print('acquired is {}'.format(acquired))
self.update_state(state='DOING WORK')
time.sleep(90)
return 'result'
# otherwise, the lock was already in use
raise self.retry(countdown=60) # redeliver message to the queue, so the work can be done later
@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
print('you are in task2')
self.update_state(state='STARTING')
time.sleep(120)
print('task2 done')
@app.route('/', methods=['GET', 'POST'])
def index():
return render_template('index.html')
@app.route('/task1', methods=['GET', 'POST'])
def task1():
print('running task1')
result = myTask1.delay()
# get async task id
taskResult = AsyncResult(result.task_id)
# push async taskid into db collection job_task_id
mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'myTask1'})
return render_template('task1.html')
@app.route('/task2', methods=['GET', 'POST'])
def task2():
print('running task2')
result = myTask2.delay()
# get async task id
taskResult = AsyncResult(result.task_id)
# push async taskid into db collection job_task_id
mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})
return render_template('task2.html')
@app.route('/status', methods=['GET', 'POST'])
def status():
taskid_list = []
task_state_list = []
TaskName_list = []
allAsyncData = mongo.db.job_task_id.find()
for doc in allAsyncData:
try:
taskid_list.append(doc['taskid'])
except:
print('error with db conneciton in asyncJobStatus')
TaskName_list.append(doc['TaskName'])
# PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
for item in taskid_list:
try:
task_state_list.append(myTask1.AsyncResult(item).state)
except:
task_state_list.append('UNKNOWN')
return render_template('status.html', data_list=zip(task_state_list, TaskName_list))
if __name__ == '__main__':
app.secret_key = 'super secret key for me123456789987654321'
app.run(port=1234, host='localhost')
这也是一个屏幕截图,您可以看到我 运行 myTask1
两次和 myTask2 一次。现在我有了 myTask1 的预期行为。现在 myTask1
将由一个工人 运行 如果另一个工人试图拿起它,它将根据我定义的任何内容继续重试。
使用此设置,您仍应期望看到工作人员接收任务,因为锁是在任务本身内部检查的。唯一的区别是,如果锁被另一个工作人员获取,则不会执行该工作。
在文档中给出的示例中,这是所需的行为;如果锁已经存在,任务将什么都不做并成功完成。您想要的略有不同;您希望工作排队而不是被忽略。
为了获得预期的效果,您需要确保该任务将被工人接走并在未来某个时间执行。实现此目的的一种方法是重试。
@task(bind=True, name='my-task')
def my_task(self):
lock_id = self.name
with memcache_lock(lock_id, self.app.oid) as acquired:
if acquired:
# do work if we got the lock
print('acquired is {}'.format(acquired))
return 'result'
# otherwise, the lock was already in use
raise self.retry(countdown=60) # redeliver message to the queue, so the work can be done later
在您的问题中,您指出了您使用的 Celery 示例中的警告:
In order for this to work correctly you need to be using a cache backend where the .add
operation is atomic. memcached
is known to work well for this purpose.
你提到你并不真正理解这意味着什么。实际上,您显示的代码表明您没有注意到该警告,因为您的代码使用了不合适的后端。
考虑这段代码:
with memcache_lock(lock_id, self.app.oid) as acquired:
if acquired:
# do some work
此处您希望 acquired
一次仅对一个线程为真。如果两个线程同时进入 with
块,则只有一个线程应该 "win" 并且 acquired
为真。这个 acquired
为 true 的线程然后可以继续其工作,而另一个线程必须跳过执行工作并稍后再次尝试获取锁。 为了保证只有一个线程可以acquired
为真,.add
必须是原子的。
下面是 .add(key, value)
的一些伪代码:
1. if <key> is already in the cache:
2. return False
3. else:
4. set the cache so that <key> has the value <value>
5. return True
如果 .add
的执行不是原子的,那么如果两个线程 A 和 B 执行 .add("foo", "bar")
就可能发生这种情况。假设开始时缓存为空。
- 线程A执行
1. if "foo" is already in the cache
,发现"foo"
不在缓存中,跳转到第3行但是线程调度器将控制切换到线程B。
- 线程B也执行
1. if "foo" is already in the cache
,也发现"foo"不在缓存中。所以它跳转到第 3 行,然后执行第 4 行和第 5 行,将键 "foo"
设置为值 "bar"
并调用 returns True
.
- 最终,调度程序将控制权交还给线程 A,线程 A 继续执行 3、4、5 并将键
"foo"
设置为值 "bar"
以及 returns True
.
这里有两个 .add
调用 return True
,如果这些 .add
调用是在 memcache_lock
内进行的,则需要两个线程可以让 acquired
为真。所以两个线程可以同时工作,而你的 memcache_lock
没有做它应该做的,一次只允许一个线程工作。
您没有使用确保 .add
是原子的缓存 。你像这样初始化它:
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
simple
backend 仅限于单个进程,没有线程安全,并且有一个非原子的 .add
操作。 (顺便说一下,这根本不涉及 Mongo。如果您希望缓存由 Mongo 支持,则必须指定一个专门用于将数据发送到 Mongo数据库。)
所以你必须切换到另一个后端,一个保证 .add
是原子的。您可以效仿 Celery 示例并使用 memcached
backend,它确实具有原子 .add
操作。我不使用 Flask,但我基本上做了您使用 Django 和 Celery 所做的事情,并成功地使用 Redis 后端来提供您在这里使用的那种锁定。
我还发现这是一个非常难的问题。主要受 Sebastian's work on implementing a distributed locking algorithm in redis I wrote up a decorator function.
启发
关于这种方法要牢记的一个关键点是我们在任务参数级别锁定任务 space,例如我们允许多个游戏 update/process 同时向 运行 订购任务,但每个游戏只能有一个。这就是 argument_signature
在下面的代码中实现的。您可以在 this gist:
查看有关我们如何在堆栈中使用它的文档
import base64
from contextlib import contextmanager
import json
import pickle as pkl
import uuid
from backend.config import Config
from redis import StrictRedis
from redis_cache import RedisCache
from redlock import Redlock
rds = StrictRedis(Config.REDIS_HOST, decode_responses=True, charset="utf-8")
rds_cache = StrictRedis(Config.REDIS_HOST, decode_responses=False, charset="utf-8")
redis_cache = RedisCache(redis_client=rds_cache, prefix="rc", serializer=pkl.dumps, deserializer=pkl.loads)
dlm = Redlock([{"host": Config.REDIS_HOST}])
TASK_LOCK_MSG = "Task execution skipped -- another task already has the lock"
DEFAULT_ASSET_EXPIRATION = 8 * 24 * 60 * 60 # by default keep cached values around for 8 days
DEFAULT_CACHE_EXPIRATION = 1 * 24 * 60 * 60 # we can keep cached values around for a shorter period of time
REMOVE_ONLY_IF_OWNER_SCRIPT = """
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
"""
@contextmanager
def redis_lock(lock_name, expires=60):
# https://breadcrumbscollector.tech/what-is-celery-beat-and-how-to-use-it-part-2-patterns-and-caveats/
random_value = str(uuid.uuid4())
lock_acquired = bool(
rds.set(lock_name, random_value, ex=expires, nx=True)
)
yield lock_acquired
if lock_acquired:
rds.eval(REMOVE_ONLY_IF_OWNER_SCRIPT, 1, lock_name, random_value)
def argument_signature(*args, **kwargs):
arg_list = [str(x) for x in args]
kwarg_list = [f"{str(k)}:{str(v)}" for k, v in kwargs.items()]
return base64.b64encode(f"{'_'.join(arg_list)}-{'_'.join(kwarg_list)}".encode()).decode()
def task_lock(func=None, main_key="", timeout=None):
def _dec(run_func):
def _caller(*args, **kwargs):
with redis_lock(f"{main_key}_{argument_signature(*args, **kwargs)}", timeout) as acquired:
if not acquired:
return TASK_LOCK_MSG
return run_func(*args, **kwargs)
return _caller
return _dec(func) if func is not None else _dec
在我们的任务定义文件中实现:
@celery.task(name="async_test_task_lock")
@task_lock(main_key="async_test_task_lock", timeout=UPDATE_GAME_DATA_TIMEOUT)
def async_test_task_lock(game_id):
print(f"processing game_id {game_id}")
time.sleep(TASK_LOCK_TEST_SLEEP)
我们如何针对本地 celery 集群进行测试:
from backend.tasks.definitions import async_test_task_lock, TASK_LOCK_TEST_SLEEP
from backend.tasks.redis_handlers import rds, TASK_LOCK_MSG
class TestTaskLocking(TestCase):
def test_task_locking(self):
rds.flushall()
res1 = async_test_task_lock.delay(3)
res2 = async_test_task_lock.delay(5)
self.assertFalse(res1.ready())
self.assertFalse(res2.ready())
res3 = async_test_task_lock.delay(5)
res4 = async_test_task_lock.delay(5)
self.assertEqual(res3.get(), TASK_LOCK_MSG)
self.assertEqual(res4.get(), TASK_LOCK_MSG)
time.sleep(TASK_LOCK_TEST_SLEEP)
res5 = async_test_task_lock.delay(3)
self.assertFalse(res5.ready())
(作为好东西,还有一个如何设置 redis_cache
的快速示例)
我正在将 Flask 与 Celery 一起使用,我正在尝试锁定特定任务,以便一次只能 运行 一个任务。在芹菜文档中,它给出了一个这样做的例子 Celery docs, Ensuring a task is only executed one at a time。给出的这个示例是针对 Django 的,但是我正在使用 Flask 我已尽力将其转换为与 Flask 一起使用,但是我仍然看到具有锁的 myTask1 可以 运行 多次。
我不清楚的一件事是,如果我正确使用缓存,我以前从未使用过它,所以所有这些对我来说都是新的。文档中提到但未解释的一件事是
文档注释:
In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.
我不太确定那是什么意思,我应该将缓存与数据库结合使用吗?如果是,我该怎么做?我正在使用 mongodb。在我的代码中,我只是为缓存 cache = Cache(app, config={'CACHE_TYPE': 'simple'})
设置了这个设置,因为这就是 Flask-Cache 文档 Flask-Cache Docs
我不清楚的另一件事是,当我从我的 Flask 路由 task1
myTask1
时,我是否需要做任何不同的事情
这是我正在使用的代码示例。
from flask import (Flask, render_template, flash, redirect,
url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time
app = Flask(__name__)
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'
######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'
mongo = PyMongo(app)
##############################
# CELERY ARGUMENTS
##############################
app.config['CELERY_BROKER_URL'] = 'amqp://localhost//'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
"host": "localhost",
"port": 27017,
"database": "celery-test-db",
"taskmeta_collection": "celery_jobs",
}
app.config['CELERY_TASK_SERIALIZER'] = 'json'
celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)
LOCK_EXPIRE = 60 * 2 # Lock expires in 2 minutes
@contextmanager
def memcache_lock(lock_id, oid):
timeout_at = monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
@celery.task(bind=True, name='app.myTask1')
def myTask1(self):
self.update_state(state='IN TASK')
lock_id = self.name
with memcache_lock(lock_id, self.app.oid) as acquired:
if acquired:
# do work if we got the lock
print('acquired is {}'.format(acquired))
self.update_state(state='DOING WORK')
time.sleep(90)
return 'result'
# otherwise, the lock was already in use
raise self.retry(countdown=60) # redeliver message to the queue, so the work can be done later
@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
print('you are in task2')
self.update_state(state='STARTING')
time.sleep(120)
print('task2 done')
@app.route('/', methods=['GET', 'POST'])
def index():
return render_template('index.html')
@app.route('/task1', methods=['GET', 'POST'])
def task1():
print('running task1')
result = myTask1.delay()
# get async task id
taskResult = AsyncResult(result.task_id)
# push async taskid into db collection job_task_id
mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task1'})
return render_template('task1.html')
@app.route('/task2', methods=['GET', 'POST'])
def task2():
print('running task2')
result = myTask2.delay()
# get async task id
taskResult = AsyncResult(result.task_id)
# push async taskid into db collection job_task_id
mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})
return render_template('task2.html')
@app.route('/status', methods=['GET', 'POST'])
def status():
taskid_list = []
task_state_list = []
TaskName_list = []
allAsyncData = mongo.db.job_task_id.find()
for doc in allAsyncData:
try:
taskid_list.append(doc['taskid'])
except:
print('error with db conneciton in asyncJobStatus')
TaskName_list.append(doc['TaskName'])
# PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
for item in taskid_list:
try:
task_state_list.append(myTask1.AsyncResult(item).state)
except:
task_state_list.append('UNKNOWN')
return render_template('status.html', data_list=zip(task_state_list, TaskName_list))
最终工作代码
from flask import (Flask, render_template, flash, redirect,
url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time
import redis
from flask_redis import FlaskRedis
app = Flask(__name__)
# ADDING REDIS
redis_store = FlaskRedis(app)
# POINTING CACHE_TYPE TO REDIS
cache = Cache(app, config={'CACHE_TYPE': 'redis'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'
######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'
mongo = PyMongo(app)
##############################
# CELERY ARGUMENTS
##############################
# CELERY USING REDIS
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
"host": "localhost",
"port": 27017,
"database": "celery-test-db",
"taskmeta_collection": "celery_jobs",
}
app.config['CELERY_TASK_SERIALIZER'] = 'json'
celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)
LOCK_EXPIRE = 60 * 2 # Lock expires in 2 minutes
@contextmanager
def memcache_lock(lock_id, oid):
timeout_at = monotonic() + LOCK_EXPIRE - 3
print('in memcache_lock and timeout_at is {}'.format(timeout_at))
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
print('memcache_lock and status is {}'.format(status))
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
@celery.task(bind=True, name='app.myTask1')
def myTask1(self):
self.update_state(state='IN TASK')
print('dir is {} '.format(dir(self)))
lock_id = self.name
print('lock_id is {}'.format(lock_id))
with memcache_lock(lock_id, self.app.oid) as acquired:
print('in memcache_lock and lock_id is {} self.app.oid is {} and acquired is {}'.format(lock_id, self.app.oid, acquired))
if acquired:
# do work if we got the lock
print('acquired is {}'.format(acquired))
self.update_state(state='DOING WORK')
time.sleep(90)
return 'result'
# otherwise, the lock was already in use
raise self.retry(countdown=60) # redeliver message to the queue, so the work can be done later
@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
print('you are in task2')
self.update_state(state='STARTING')
time.sleep(120)
print('task2 done')
@app.route('/', methods=['GET', 'POST'])
def index():
return render_template('index.html')
@app.route('/task1', methods=['GET', 'POST'])
def task1():
print('running task1')
result = myTask1.delay()
# get async task id
taskResult = AsyncResult(result.task_id)
# push async taskid into db collection job_task_id
mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'myTask1'})
return render_template('task1.html')
@app.route('/task2', methods=['GET', 'POST'])
def task2():
print('running task2')
result = myTask2.delay()
# get async task id
taskResult = AsyncResult(result.task_id)
# push async taskid into db collection job_task_id
mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})
return render_template('task2.html')
@app.route('/status', methods=['GET', 'POST'])
def status():
taskid_list = []
task_state_list = []
TaskName_list = []
allAsyncData = mongo.db.job_task_id.find()
for doc in allAsyncData:
try:
taskid_list.append(doc['taskid'])
except:
print('error with db conneciton in asyncJobStatus')
TaskName_list.append(doc['TaskName'])
# PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
for item in taskid_list:
try:
task_state_list.append(myTask1.AsyncResult(item).state)
except:
task_state_list.append('UNKNOWN')
return render_template('status.html', data_list=zip(task_state_list, TaskName_list))
if __name__ == '__main__':
app.secret_key = 'super secret key for me123456789987654321'
app.run(port=1234, host='localhost')
这也是一个屏幕截图,您可以看到我 运行 myTask1
两次和 myTask2 一次。现在我有了 myTask1 的预期行为。现在 myTask1
将由一个工人 运行 如果另一个工人试图拿起它,它将根据我定义的任何内容继续重试。
使用此设置,您仍应期望看到工作人员接收任务,因为锁是在任务本身内部检查的。唯一的区别是,如果锁被另一个工作人员获取,则不会执行该工作。
在文档中给出的示例中,这是所需的行为;如果锁已经存在,任务将什么都不做并成功完成。您想要的略有不同;您希望工作排队而不是被忽略。
为了获得预期的效果,您需要确保该任务将被工人接走并在未来某个时间执行。实现此目的的一种方法是重试。
@task(bind=True, name='my-task')
def my_task(self):
lock_id = self.name
with memcache_lock(lock_id, self.app.oid) as acquired:
if acquired:
# do work if we got the lock
print('acquired is {}'.format(acquired))
return 'result'
# otherwise, the lock was already in use
raise self.retry(countdown=60) # redeliver message to the queue, so the work can be done later
在您的问题中,您指出了您使用的 Celery 示例中的警告:
In order for this to work correctly you need to be using a cache backend where the
.add
operation is atomic.memcached
is known to work well for this purpose.
你提到你并不真正理解这意味着什么。实际上,您显示的代码表明您没有注意到该警告,因为您的代码使用了不合适的后端。
考虑这段代码:
with memcache_lock(lock_id, self.app.oid) as acquired:
if acquired:
# do some work
此处您希望 acquired
一次仅对一个线程为真。如果两个线程同时进入 with
块,则只有一个线程应该 "win" 并且 acquired
为真。这个 acquired
为 true 的线程然后可以继续其工作,而另一个线程必须跳过执行工作并稍后再次尝试获取锁。 为了保证只有一个线程可以acquired
为真,.add
必须是原子的。
下面是 .add(key, value)
的一些伪代码:
1. if <key> is already in the cache:
2. return False
3. else:
4. set the cache so that <key> has the value <value>
5. return True
如果 .add
的执行不是原子的,那么如果两个线程 A 和 B 执行 .add("foo", "bar")
就可能发生这种情况。假设开始时缓存为空。
- 线程A执行
1. if "foo" is already in the cache
,发现"foo"
不在缓存中,跳转到第3行但是线程调度器将控制切换到线程B。 - 线程B也执行
1. if "foo" is already in the cache
,也发现"foo"不在缓存中。所以它跳转到第 3 行,然后执行第 4 行和第 5 行,将键"foo"
设置为值"bar"
并调用 returnsTrue
. - 最终,调度程序将控制权交还给线程 A,线程 A 继续执行 3、4、5 并将键
"foo"
设置为值"bar"
以及 returnsTrue
.
这里有两个 .add
调用 return True
,如果这些 .add
调用是在 memcache_lock
内进行的,则需要两个线程可以让 acquired
为真。所以两个线程可以同时工作,而你的 memcache_lock
没有做它应该做的,一次只允许一个线程工作。
您没有使用确保 .add
是原子的缓存 。你像这样初始化它:
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
simple
backend 仅限于单个进程,没有线程安全,并且有一个非原子的 .add
操作。 (顺便说一下,这根本不涉及 Mongo。如果您希望缓存由 Mongo 支持,则必须指定一个专门用于将数据发送到 Mongo数据库。)
所以你必须切换到另一个后端,一个保证 .add
是原子的。您可以效仿 Celery 示例并使用 memcached
backend,它确实具有原子 .add
操作。我不使用 Flask,但我基本上做了您使用 Django 和 Celery 所做的事情,并成功地使用 Redis 后端来提供您在这里使用的那种锁定。
我还发现这是一个非常难的问题。主要受 Sebastian's work on implementing a distributed locking algorithm in redis I wrote up a decorator function.
启发关于这种方法要牢记的一个关键点是我们在任务参数级别锁定任务 space,例如我们允许多个游戏 update/process 同时向 运行 订购任务,但每个游戏只能有一个。这就是 argument_signature
在下面的代码中实现的。您可以在 this gist:
import base64
from contextlib import contextmanager
import json
import pickle as pkl
import uuid
from backend.config import Config
from redis import StrictRedis
from redis_cache import RedisCache
from redlock import Redlock
rds = StrictRedis(Config.REDIS_HOST, decode_responses=True, charset="utf-8")
rds_cache = StrictRedis(Config.REDIS_HOST, decode_responses=False, charset="utf-8")
redis_cache = RedisCache(redis_client=rds_cache, prefix="rc", serializer=pkl.dumps, deserializer=pkl.loads)
dlm = Redlock([{"host": Config.REDIS_HOST}])
TASK_LOCK_MSG = "Task execution skipped -- another task already has the lock"
DEFAULT_ASSET_EXPIRATION = 8 * 24 * 60 * 60 # by default keep cached values around for 8 days
DEFAULT_CACHE_EXPIRATION = 1 * 24 * 60 * 60 # we can keep cached values around for a shorter period of time
REMOVE_ONLY_IF_OWNER_SCRIPT = """
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
"""
@contextmanager
def redis_lock(lock_name, expires=60):
# https://breadcrumbscollector.tech/what-is-celery-beat-and-how-to-use-it-part-2-patterns-and-caveats/
random_value = str(uuid.uuid4())
lock_acquired = bool(
rds.set(lock_name, random_value, ex=expires, nx=True)
)
yield lock_acquired
if lock_acquired:
rds.eval(REMOVE_ONLY_IF_OWNER_SCRIPT, 1, lock_name, random_value)
def argument_signature(*args, **kwargs):
arg_list = [str(x) for x in args]
kwarg_list = [f"{str(k)}:{str(v)}" for k, v in kwargs.items()]
return base64.b64encode(f"{'_'.join(arg_list)}-{'_'.join(kwarg_list)}".encode()).decode()
def task_lock(func=None, main_key="", timeout=None):
def _dec(run_func):
def _caller(*args, **kwargs):
with redis_lock(f"{main_key}_{argument_signature(*args, **kwargs)}", timeout) as acquired:
if not acquired:
return TASK_LOCK_MSG
return run_func(*args, **kwargs)
return _caller
return _dec(func) if func is not None else _dec
在我们的任务定义文件中实现:
@celery.task(name="async_test_task_lock")
@task_lock(main_key="async_test_task_lock", timeout=UPDATE_GAME_DATA_TIMEOUT)
def async_test_task_lock(game_id):
print(f"processing game_id {game_id}")
time.sleep(TASK_LOCK_TEST_SLEEP)
我们如何针对本地 celery 集群进行测试:
from backend.tasks.definitions import async_test_task_lock, TASK_LOCK_TEST_SLEEP
from backend.tasks.redis_handlers import rds, TASK_LOCK_MSG
class TestTaskLocking(TestCase):
def test_task_locking(self):
rds.flushall()
res1 = async_test_task_lock.delay(3)
res2 = async_test_task_lock.delay(5)
self.assertFalse(res1.ready())
self.assertFalse(res2.ready())
res3 = async_test_task_lock.delay(5)
res4 = async_test_task_lock.delay(5)
self.assertEqual(res3.get(), TASK_LOCK_MSG)
self.assertEqual(res4.get(), TASK_LOCK_MSG)
time.sleep(TASK_LOCK_TEST_SLEEP)
res5 = async_test_task_lock.delay(3)
self.assertFalse(res5.ready())
(作为好东西,还有一个如何设置 redis_cache
的快速示例)