Django:如何在使用 cron 作业时测试并发问题

Django: how to test for concurrency issues when using cron jobs

我有一个托管在远程服务器上的 Django 应用程序,它 运行 以相对较短的间隔执行一些 cron 作业。这些 cron 作业之一执行从数据库中获取查询集的命令,调用外部 API 并根据 API 的响应更改模型。如果我不小心,cron 作业将在 API 响应之前执行多次,从而导致并发问题和同一模型的多个实例同时更新。

我有不同的策略来避免这个问题,但我想编写我可以在本地 运行 的测试,模拟 API 调用并确保两个 cron 任务不会同时执行尝试同时处理一个对象。我该怎么做?

我的代码看起来像这样(说明问题的目的):

def task():
    qs = MyModel.objects.filter(task_ran=False)
    for model in qs:
        resp = api_call(model.foo)
        model.bar = resp
        model.task_ran = True
        model.save()

那么,我如何编写一个测试,检查如果 task() 在第一次调用完成之前被第二次调用,那么它不会再次更新模型并且 API 不会再被叫到了吧?下面是一个测试的草图,我试图将对 task() 的调用放在单独的线程中,但这会导致测试冻结并且 - 在 KeyboardInterrupt 之后 - 失败

django.db.utils.OperationalError: database "test_db" is being accessed by other users

DETAIL: There is 1 other session using the database.```

@patch("api_call")
def test_task(self, mock_api_call):
    def side_effect(number):
        time.sleep(2)
        return number + 1

    mock_api_call.side_effect = side_effect

    # how to call these simultaneously? threading causes Django to get mad
    task()
    task()

    mock_api_call.assert_called_once()

好的,所以我根据 this answer 找到了答案。基本上,可以通过线程在 Django 中完成测试,但它需要一些东西:

  • 首先,测试 class 本身必须是 TransactionTestCase 的子 class(至少如果它涉及任何数据库恶作剧或使用 .select_for_update我的代码就是这样做的)。
  • 其次,在每个线程中打开的数据库连接应该在线程终止时再次关闭。这可以通过使用 ThreadPoolExecutor 来创建 Future ,然后通过 .add_done_callback.
  • 添加线程完成时的回调函数来完成

这样,测试就可以写成这样了:

import concurrent.futures
from django.db import connections
from django.test import TransactionTestCase

class CronTestCase(TransactionTestCase):
    def on_done(self, future):
        connections.close_all()

    @patch("api_call")
    def test_task(self, mock_api_call):
        # setup the test
        num_threads = 5
        with concurrent.futures.ThreadPoolExecutor() as executor:
            for _ in range(num_threads):
                future = executor.submit(task)
                future.add_done_callback(self.on_done)

        mock_api_call.assert_called_once()