如何在任务中打破芹菜任务

how break a celery task inside task

我将 celery 与 django.inside 一个 forloop 一起使用,每次都在 db 中为一个关系设置一个值集

lawyers=Lawyer.objects.filter(consultation_status=True)
for idx,lawyer in enumerate(lawyers):
    if(consultation.lawyer):
        break
    change_offered_lawyer.apply_async((id,lawyer.id),countdown=idx*60)

任务中的循环中的每个回合我都会检查条件,我的目标是如果条件终止则中断所有这些任务。

@app.task
def change_offered_lawyer(consulation_id,consulator_id):
    consulation=ConsultationOrder.objects.get(id=consulation_id)
    consulator=Lawyer.objects.get(id=consulator_id)
    if(consultation.lawyer):
        #break all tasks 
    consultation.offered_lawyer=consulator
    consultation.save()

为了下面的更简单的解释,我们将只使用数字 1-10 的列表,它应该在第 4 个数字之后中断,因此处理 1-3 并跳过 4-10。

解决方案 1:使用链式任务

设计总结:

Link 每个任务彼此相邻。在一个任务 returns 之后,调用下一个任务,依此类推。任务必须 return 一个值,该值将指示下一个任务是继续还是中断。

制作人:

from celery import chain

from task import change_offered_lawyer

tasks = []

lawyers_count = 10
for lawyer_id in range(1, lawyers_count + 1):
    if lawyer_id == 1:
        # If first item, manually set the initial value for should_continue
        tasks.append(change_offered_lawyer.s(True, lawyer_id))
    else:
        tasks.append(change_offered_lawyer.s(lawyer_id))

chain(*tasks).apply_async()

消费者:

from celery import shared_task


@shared_task
def change_offered_lawyer(should_continue, lawyer_id):
    if not should_continue:
        print(f"{lawyer_id=} Break...")
        return False

    if lawyer_id == 4:
        print(f"{lawyer_id=} Break now!")
        return False

    print(f"{lawyer_id=} Continue...")
    return True

日志:

[2021-08-10 03:04:03,294: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b01490c6-59ee-473e-b15d-40d4f628444c] received
[2021-08-10 03:04:03,296: WARNING/MainProcess] lawyer_id=1 Continue...
[2021-08-10 03:04:03,296: WARNING/MainProcess] 

[2021-08-10 03:04:03,299: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b01490c6-59ee-473e-b15d-40d4f628444c] succeeded in 0.003656674999547249s: True
[2021-08-10 03:04:03,300: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[3ea4e578-4f56-491b-a774-b0a95b33c123] received
[2021-08-10 03:04:03,301: WARNING/MainProcess] lawyer_id=2 Continue...
[2021-08-10 03:04:03,301: WARNING/MainProcess] 

[2021-08-10 03:04:03,302: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[3ea4e578-4f56-491b-a774-b0a95b33c123] succeeded in 0.001406519999363809s: True
[2021-08-10 03:04:03,303: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[dca891ef-4bac-4fa0-85e3-9d7ddfe031d5] received
[2021-08-10 03:04:03,305: WARNING/MainProcess] lawyer_id=3 Continue...
[2021-08-10 03:04:03,305: WARNING/MainProcess] 

[2021-08-10 03:04:03,307: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[dca891ef-4bac-4fa0-85e3-9d7ddfe031d5] succeeded in 0.0023091260000001057s: True
[2021-08-10 03:04:03,308: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[ea57df15-88e7-4b65-8fc3-79df0a189652] received
[2021-08-10 03:04:03,313: WARNING/MainProcess] lawyer_id=4 Break now!
[2021-08-10 03:04:03,313: WARNING/MainProcess] 

[2021-08-10 03:04:03,314: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[ea57df15-88e7-4b65-8fc3-79df0a189652] succeeded in 0.001986995999686769s: False
[2021-08-10 03:04:03,315: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[1e8fa660-2125-4790-b44e-4f884b826b40] received
[2021-08-10 03:04:03,318: WARNING/MainProcess] lawyer_id=5 Break...
[2021-08-10 03:04:03,318: WARNING/MainProcess] 

[2021-08-10 03:04:03,324: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[1e8fa660-2125-4790-b44e-4f884b826b40] succeeded in 0.006545270000060555s: False
[2021-08-10 03:04:03,325: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b6825617-6162-4273-ad14-c128190e18be] received
[2021-08-10 03:04:03,327: WARNING/MainProcess] lawyer_id=6 Break...
[2021-08-10 03:04:03,327: WARNING/MainProcess] 

[2021-08-10 03:04:03,328: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b6825617-6162-4273-ad14-c128190e18be] succeeded in 0.002028814999903261s: False
[2021-08-10 03:04:03,329: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[5d8c1201-48c7-4238-8f92-b999adccfad9] received
[2021-08-10 03:04:03,330: WARNING/MainProcess] lawyer_id=7 Break...
[2021-08-10 03:04:03,330: WARNING/MainProcess] 

[2021-08-10 03:04:03,331: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[5d8c1201-48c7-4238-8f92-b999adccfad9] succeeded in 0.0015183160003289231s: False
[2021-08-10 03:04:03,332: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[8aedf4ad-b3d6-44f6-b5e8-8227bc02df8a] received
[2021-08-10 03:04:03,333: WARNING/MainProcess] lawyer_id=8 Break...
[2021-08-10 03:04:03,333: WARNING/MainProcess] 

[2021-08-10 03:04:03,335: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[8aedf4ad-b3d6-44f6-b5e8-8227bc02df8a] succeeded in 0.0019754629993258277s: False
[2021-08-10 03:04:03,336: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[28dfedc7-40d6-45ab-8d3b-972db47e99bf] received
[2021-08-10 03:04:03,337: WARNING/MainProcess] lawyer_id=9 Break...
[2021-08-10 03:04:03,337: WARNING/MainProcess] 

[2021-08-10 03:04:03,338: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[28dfedc7-40d6-45ab-8d3b-972db47e99bf] succeeded in 0.0016348939998351852s: False
[2021-08-10 03:04:03,339: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[288f6c3d-77d7-4161-9b77-45bcd815f14e] received
[2021-08-10 03:04:03,341: WARNING/MainProcess] lawyer_id=10 Break...
[2021-08-10 03:04:03,341: WARNING/MainProcess] 

[2021-08-10 03:04:03,341: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[288f6c3d-77d7-4161-9b77-45bcd815f14e] succeeded in 0.00046298599954752717s: False

参考文献:

解决方案 2:使用公共存储来跟踪状态

设计总结:

使用数据库存储来跟踪执行是必须继续还是中断。在这里,我们将使用 django-caching 系统以方便使用。请注意,如果您打算在没有 countdown 的情况下 运行 并行执行任务,则此解决方案可能对 race conditions.

有效

settings.py :

...
CACHES = {
    "default": {
        "BACKEND": "django.core.cache.backends.db.DatabaseCache",
        "LOCATION": "my_cache_table",
    }
}
...
  • 在此之后,运行 python manage.py createcachetable

制作人:

from django.core.cache import cache

from task import change_offered_lawyer, reset_cache

cache.set(key='change_offered_lawyer', value='continue', timeout=None)

lawyers_count = 10
for lawyer_id in range(1, lawyers_count + 1):
    if lawyer_id == lawyers_count:
        # If last item, reset the cache
        change_offered_lawyer.apply_async((lawyer_id,), countdown=lawyer_id * 5, link=reset_cache.si())
    else:
        change_offered_lawyer.apply_async((lawyer_id,), countdown=lawyer_id * 5)

消费者:

from celery import shared_task
from django.core.cache import cache


@shared_task
def change_offered_lawyer(lawyer_id):
    if cache.get('change_offered_lawyer') == 'break':
        print(f"{lawyer_id=} Break...")
        return

    if lawyer_id == 4:
        cache.set(key='change_offered_lawyer', value='break', timeout=None)
        print(f"{lawyer_id=} Break now!")
        return

    print(f"{lawyer_id=} Continue...")


@shared_task
def reset_cache():
    cache.delete('change_offered_lawyer')

日志:

[2021-08-10 03:10:32,333: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[604585ed-76aa-44b6-9e9c-2119f0e33da2] received
[2021-08-10 03:10:32,335: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[6305c7d1-c5ff-4c40-915c-6e37ecf9c440] received
[2021-08-10 03:10:32,337: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[3437dc9d-e3a5-4da3-8020-be52cf871aa1] received
[2021-08-10 03:10:32,343: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[682a21e5-0531-443b-b829-17c40b9c6509] received
[2021-08-10 03:10:32,344: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[0d7dbff6-0207-441e-a3e3-2840f028b393] received
[2021-08-10 03:10:32,345: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[e4e7987f-1ebe-4fc6-b37d-a475783fa1b5] received
[2021-08-10 03:10:32,346: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[a9dc6b82-c507-47f0-9bab-073d33e24a19] received
[2021-08-10 03:10:32,347: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[ac8b57cd-2c15-4978-9a70-69d9f876d776] received
[2021-08-10 03:10:32,349: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[97fc021e-5426-43e3-b6bf-0e83c4ff4eda] received
[2021-08-10 03:10:32,349: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[c3bda008-6cda-436e-a454-0e9dc8900857] received
[2021-08-10 03:10:37,258: WARNING/MainProcess] lawyer_id=1 Continue...
[2021-08-10 03:10:37,258: WARNING/MainProcess] 

[2021-08-10 03:10:37,258: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[604585ed-76aa-44b6-9e9c-2119f0e33da2] succeeded in 0.01066518100014946s: None
[2021-08-10 03:10:42,347: WARNING/MainProcess] lawyer_id=2 Continue...
[2021-08-10 03:10:42,347: WARNING/MainProcess] 

[2021-08-10 03:10:42,347: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[6305c7d1-c5ff-4c40-915c-6e37ecf9c440] succeeded in 0.0018151690001104726s: None
[2021-08-10 03:10:47,350: WARNING/MainProcess] lawyer_id=3 Continue...
[2021-08-10 03:10:47,350: WARNING/MainProcess] 

[2021-08-10 03:10:47,350: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[3437dc9d-e3a5-4da3-8020-be52cf871aa1] succeeded in 0.004217886999867915s: None
[2021-08-10 03:10:52,340: WARNING/MainProcess] lawyer_id=4 Break now!
[2021-08-10 03:10:52,340: WARNING/MainProcess] 

[2021-08-10 03:10:52,341: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[682a21e5-0531-443b-b829-17c40b9c6509] succeeded in 0.013524283999686304s: None
[2021-08-10 03:10:57,330: WARNING/MainProcess] lawyer_id=5 Break...
[2021-08-10 03:10:57,330: WARNING/MainProcess] 

[2021-08-10 03:10:57,330: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[0d7dbff6-0207-441e-a3e3-2840f028b393] succeeded in 0.004878013000052306s: None
[2021-08-10 03:11:02,338: WARNING/MainProcess] lawyer_id=6 Break...
[2021-08-10 03:11:02,338: WARNING/MainProcess] 

[2021-08-10 03:11:02,338: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[e4e7987f-1ebe-4fc6-b37d-a475783fa1b5] succeeded in 0.00420587299959152s: None
[2021-08-10 03:11:07,336: WARNING/MainProcess] lawyer_id=7 Break...
[2021-08-10 03:11:07,336: WARNING/MainProcess] 

[2021-08-10 03:11:07,337: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[a9dc6b82-c507-47f0-9bab-073d33e24a19] succeeded in 0.005373962000703614s: None
[2021-08-10 03:11:12,327: WARNING/MainProcess] lawyer_id=8 Break...
[2021-08-10 03:11:12,328: WARNING/MainProcess] 

[2021-08-10 03:11:12,328: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[ac8b57cd-2c15-4978-9a70-69d9f876d776] succeeded in 0.004143920999922557s: None
[2021-08-10 03:11:17,342: WARNING/MainProcess] lawyer_id=9 Break...
[2021-08-10 03:11:17,342: WARNING/MainProcess] 

[2021-08-10 03:11:17,343: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[97fc021e-5426-43e3-b6bf-0e83c4ff4eda] succeeded in 0.004410948999975517s: None
[2021-08-10 03:11:22,327: WARNING/MainProcess] lawyer_id=10 Break...
[2021-08-10 03:11:22,327: WARNING/MainProcess] 

[2021-08-10 03:11:22,331: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[c3bda008-6cda-436e-a454-0e9dc8900857] succeeded in 0.007513158000620024s: None
[2021-08-10 03:11:22,332: INFO/MainProcess] Task task.lawyer.reset_cache[d6222a10-dbda-4764-add1-88e6ed93927a] received
[2021-08-10 03:11:22,338: INFO/MainProcess] Task task.lawyer.reset_cache[d6222a10-dbda-4764-add1-88e6ed93927a] succeeded in 0.004719080000540998s: None

参考文献:

解决方案 3:运行 任务同步(不可取)

设计总结:

change_offered_lawyer 更新为 return 是继续还是中断的指标。然后调用apply_async()后,调用get()同步接收结果,需要时中断