Celery/Django/Redis 同一任务被执行多次
Celery/Django/Redis same task getting executed multiple times
在我当前的项目中,我需要做的是从 700 多个端点获取数据,然后将该数据发送到另外 700 多个端点。我的方法是使用 Django Celery Redis,在每个工作人员上放置 70 多个端点,这样大约有 10 个工作人员将检索数据,然后 post 数据。
为此,我使用 Chord 来执行并行任务,然后计算它所花费的时间。
问题是 Celery 是 运行 同一个任务多次。 task_get_data
是主要方法,首先获取网站列表,然后将其分成 70 个一组,然后使用 Chord 调用 task_post_data
。
在下面的输出中你可以多次看到website_A
、website_B
等,我已经手动检查了我的数据和所有内容,没有重复的网站但是当提交芹菜任务时,创建了多个条目。
另外,有什么方法可以监控工人的数量以及他们在处理什么?
下面是代码
os.environ.setdefault('DJANGO_SETTINGS_MODULE','django_backend.settings')
app = Celery('django_backend', backend='redis://localhost:6379', broker='redis://localhost:6379')
app.config_from_object('django.conf:settings', namespace='CELERY')
# app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
def post_data(json_obj, website):
for items in json_obj:
md = md + items['data']
n = 50
list_of_objects = [md[i:i+n] for i in range(0, len(md), n)]
print("Starting to post data using post__data")
for items in list_of_objects:
data_details = PostDataDetails()
data_details.data = items
post_data_response = ""
try:
post_data_response = client.post_data(post_data_details = data_details)
print("Successfully posted data with status code " + str(post_data_response.status) + " for " + website)
except Exception as e:
return post_data_response.status
def read_from_file():
with open('./django_backend/data.json') as json_file:
data = json.load(json_file)
# print(data)
return data
def split_list(l,n):
website_list = [l[i:i+n] for i in range(0, len(l), n)]
return website_list
#-----------this is the main method---------
@app.task(bind=True, name="task_get_data")
def task_get_data(self):
start_time = datetime.datetime.now()
try:
website = task_helper.get_data()
task_helper.write_logs("info", "Successfully read from cache")
except Exception as e:
task_helper.write_logs("error", "Error in reading from cache. Error" + str(e))
website_list_chunks = split_list(list(website.keys()),70)
callback = get_time_difference.s(start_time)
try:
task_helper.write_logs("info", "Starting the task_post_poller_data task to post data")
header = [task_post_data.s(website_list) for website_list in website_list_chunks]
result = chord(header)(callback)
print(result)
task_helper.write_logs("info", "Successfully completed the task to post data")
except Exception as e:
task_helper.write_logs("error", "Error in creating task_post_data task. Error" + str(e))
@app.task(bind=True, name="task_post_data")
def task_post_data(self,website_list=None) -> dict:
json_object_response = True
post_data_response = None
for website in website_list:
if json_object_response:
file_data = read_from_file()
try:
post_data_response = post_data(file_data, website)
# pass
except Exception as e:
print("error", "Error in creating task_post_poller_data task. Error" + str(e))
return post_data_response
我是运行使用的代码
celery -A django_backend worker -l debug --purge
命令,我正在使用此命令提交任务
python manage.py shell
>>>from django_backend.tasks import task_get_data
>>>task_get_data.delay()
下面是控制台的输出
[2021-07-20 19:54:54,789: INFO/ForkPoolWorker-3] Successfully posted data with status code 200 for website_D
[2021-07-20 19:54:54,835: INFO/ForkPoolWorker-5] Successfully posted data with status code 200 for website_E
[2021-07-20 19:54:54,840: INFO/ForkPoolWorker-2] Successfully posted data with status code 200 for website_B
[2021-07-20 19:54:54,843: INFO/ForkPoolWorker-1] Successfully posted data with status code 200 for website_A
[2021-07-20 19:54:54,882: INFO/ForkPoolWorker-6] Successfully posted data with status code 200 for website_P
[2021-07-20 19:54:54,891: INFO/ForkPoolWorker-8] Successfully posted data with status code 200 for website_I
[2021-07-20 19:54:54,895: INFO/ForkPoolWorker-4] Successfully posted data with status code 200 for website_R
[2021-07-20 19:54:55,021: INFO/ForkPoolWorker-3] Successfully posted data with status code 200 for website_D
[2021-07-20 19:54:55,025: INFO/ForkPoolWorker-7] Successfully posted data with status code 200 for website_C
[2021-07-20 19:54:55,073: INFO/ForkPoolWorker-2] Successfully posted data with status code 200 for website_B
[2021-07-20 19:54:55,086: INFO/ForkPoolWorker-1] Successfully posted data with status code 200 for website_A
这是 Celery 和 Redis 的已知问题之一。在我的一个项目中,我曾经在缓存中为每个任务分配一个唯一标识符,然后在任务开始时检查密钥是否已经存在。你可以为这样的事情写一个上下文管理器。像这样
@contextmanager
def task_lock(lock_id, oid, lock_expire_seconds=600, unlock_after_finish=False):
"""
Be sure that task runs only once
:param lock_id: unique id of task
:param oid: unique id of current job (needs for debug only)
:param lock_expire_seconds: task will be unlocked in x seconds
:param unlock_after_finish: bool, allow run next task after finish of current one
"""
timeout_at = datetime.utcnow() + timedelta(seconds=lock_expire_seconds)
oid = "{}-{}".format(os.environ.get("HOSTNAME", ""), oid)
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, lock_expire_seconds)
try:
yield status
finally:
# cache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if unlock_after_finish and datetime.utcnow() < timeout_at:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else.
cache.delete(lock_id)
然后在你的任务代码中你可以做
def some_task():
with task_lock("task-lcok", current_time, lock_expire_seconds=10) as acquired:
if acquired:
# do something
否则,配置中还有一些其他的东西你可以玩。例如检查这些答案。
- Celery is rerunning long running completed tasks over and over
- Github issue
- Celery/Redis same task being executed multiple times in parallel
在我当前的项目中,我需要做的是从 700 多个端点获取数据,然后将该数据发送到另外 700 多个端点。我的方法是使用 Django Celery Redis,在每个工作人员上放置 70 多个端点,这样大约有 10 个工作人员将检索数据,然后 post 数据。
为此,我使用 Chord 来执行并行任务,然后计算它所花费的时间。
问题是 Celery 是 运行 同一个任务多次。 task_get_data
是主要方法,首先获取网站列表,然后将其分成 70 个一组,然后使用 Chord 调用 task_post_data
。
在下面的输出中你可以多次看到website_A
、website_B
等,我已经手动检查了我的数据和所有内容,没有重复的网站但是当提交芹菜任务时,创建了多个条目。
另外,有什么方法可以监控工人的数量以及他们在处理什么?
下面是代码
os.environ.setdefault('DJANGO_SETTINGS_MODULE','django_backend.settings')
app = Celery('django_backend', backend='redis://localhost:6379', broker='redis://localhost:6379')
app.config_from_object('django.conf:settings', namespace='CELERY')
# app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
def post_data(json_obj, website):
for items in json_obj:
md = md + items['data']
n = 50
list_of_objects = [md[i:i+n] for i in range(0, len(md), n)]
print("Starting to post data using post__data")
for items in list_of_objects:
data_details = PostDataDetails()
data_details.data = items
post_data_response = ""
try:
post_data_response = client.post_data(post_data_details = data_details)
print("Successfully posted data with status code " + str(post_data_response.status) + " for " + website)
except Exception as e:
return post_data_response.status
def read_from_file():
with open('./django_backend/data.json') as json_file:
data = json.load(json_file)
# print(data)
return data
def split_list(l,n):
website_list = [l[i:i+n] for i in range(0, len(l), n)]
return website_list
#-----------this is the main method---------
@app.task(bind=True, name="task_get_data")
def task_get_data(self):
start_time = datetime.datetime.now()
try:
website = task_helper.get_data()
task_helper.write_logs("info", "Successfully read from cache")
except Exception as e:
task_helper.write_logs("error", "Error in reading from cache. Error" + str(e))
website_list_chunks = split_list(list(website.keys()),70)
callback = get_time_difference.s(start_time)
try:
task_helper.write_logs("info", "Starting the task_post_poller_data task to post data")
header = [task_post_data.s(website_list) for website_list in website_list_chunks]
result = chord(header)(callback)
print(result)
task_helper.write_logs("info", "Successfully completed the task to post data")
except Exception as e:
task_helper.write_logs("error", "Error in creating task_post_data task. Error" + str(e))
@app.task(bind=True, name="task_post_data")
def task_post_data(self,website_list=None) -> dict:
json_object_response = True
post_data_response = None
for website in website_list:
if json_object_response:
file_data = read_from_file()
try:
post_data_response = post_data(file_data, website)
# pass
except Exception as e:
print("error", "Error in creating task_post_poller_data task. Error" + str(e))
return post_data_response
我是运行使用的代码
celery -A django_backend worker -l debug --purge
命令,我正在使用此命令提交任务
python manage.py shell
>>>from django_backend.tasks import task_get_data
>>>task_get_data.delay()
下面是控制台的输出
[2021-07-20 19:54:54,789: INFO/ForkPoolWorker-3] Successfully posted data with status code 200 for website_D
[2021-07-20 19:54:54,835: INFO/ForkPoolWorker-5] Successfully posted data with status code 200 for website_E
[2021-07-20 19:54:54,840: INFO/ForkPoolWorker-2] Successfully posted data with status code 200 for website_B
[2021-07-20 19:54:54,843: INFO/ForkPoolWorker-1] Successfully posted data with status code 200 for website_A
[2021-07-20 19:54:54,882: INFO/ForkPoolWorker-6] Successfully posted data with status code 200 for website_P
[2021-07-20 19:54:54,891: INFO/ForkPoolWorker-8] Successfully posted data with status code 200 for website_I
[2021-07-20 19:54:54,895: INFO/ForkPoolWorker-4] Successfully posted data with status code 200 for website_R
[2021-07-20 19:54:55,021: INFO/ForkPoolWorker-3] Successfully posted data with status code 200 for website_D
[2021-07-20 19:54:55,025: INFO/ForkPoolWorker-7] Successfully posted data with status code 200 for website_C
[2021-07-20 19:54:55,073: INFO/ForkPoolWorker-2] Successfully posted data with status code 200 for website_B
[2021-07-20 19:54:55,086: INFO/ForkPoolWorker-1] Successfully posted data with status code 200 for website_A
这是 Celery 和 Redis 的已知问题之一。在我的一个项目中,我曾经在缓存中为每个任务分配一个唯一标识符,然后在任务开始时检查密钥是否已经存在。你可以为这样的事情写一个上下文管理器。像这样
@contextmanager
def task_lock(lock_id, oid, lock_expire_seconds=600, unlock_after_finish=False):
"""
Be sure that task runs only once
:param lock_id: unique id of task
:param oid: unique id of current job (needs for debug only)
:param lock_expire_seconds: task will be unlocked in x seconds
:param unlock_after_finish: bool, allow run next task after finish of current one
"""
timeout_at = datetime.utcnow() + timedelta(seconds=lock_expire_seconds)
oid = "{}-{}".format(os.environ.get("HOSTNAME", ""), oid)
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, lock_expire_seconds)
try:
yield status
finally:
# cache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if unlock_after_finish and datetime.utcnow() < timeout_at:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else.
cache.delete(lock_id)
然后在你的任务代码中你可以做
def some_task():
with task_lock("task-lcok", current_time, lock_expire_seconds=10) as acquired:
if acquired:
# do something
否则,配置中还有一些其他的东西你可以玩。例如检查这些答案。
- Celery is rerunning long running completed tasks over and over
- Github issue
- Celery/Redis same task being executed multiple times in parallel