django,多数据库(writer,read-reploicas)和同步问题

django, multi-databases (writer, read-reploicas) and a sync issue

所以...作为对 API 电话的回应,我这样做:

i = CertainObject(paramA=1, paramB=2)
i.save()

现在我的作家数据库有一条新记录。

处理可能需要一些时间,我不想推迟对 API 调用者的响应,所以下一行我将使用 Celery 将对象 ID 传输到异步作业:

run_async_job.delay(i.id)

立即,或几秒钟后,具体取决于队列 run_async_job 尝试使用提供的 ID 从数据库中加载记录。这是一场赌博。有时有效,有时无效取决于只读副本是否更新。

是否有模式可以保证成功而不必在阅读前“睡”几秒钟或希望好运?

谢谢。

本文深入探讨了如何处理复制数据库的写后读问题:https://medium.com/box-tech-blog/how-we-learned-to-stop-worrying-and-read-from-replicas-58cc43973638

和作者一样,我知道没有万无一失的方法来处理先写后读的不一致性。

我之前使用的主要策略是使用某种 expect_and_get(pk, max_attempts=10, delay_seconds=5) 方法尝试获取记录,并尝试 max_attempts 次,延迟 delay_seconds 秒尝试之间。这个想法是它“期望”记录存在,因此它将一定数量的故障视为只是暂时的数据库问题。它比只休眠一段时间更可靠,因为它会更快地获取记录,并希望延迟作业执行的频率大大降低。

另一种策略是通过特殊的 save_to_read 方法延迟 returning 直到只读副本具有值,通过以某种方式将新值同步推送到只读副本或仅轮询它们直到他们 return 记录。这种方式似乎有点黑客 IMO。

对于您的大量读取,您可能不必担心写入后读取的一致性:

If we’re rendering the name of the enterprise a user is part of, it’s really not that big a deal if in the incredibly rare occasion that an admin changes it, it takes a minute to have the change propagate to the enterprise’s users.

最简单的解决方案是捕获任务开始时抛出的任何 DoesNotExist 错误,然后安排重试。这可以通过将 run_async_job 转换为 Bound Task:

来完成
@app.task(bind=True)
def run_async_job(self, object_id):
    try:
        instance = CertainObject.objects.get(id=object_id)
    except CertainObject.DoesNotExist:
        return self.retry(object_id)

最简单的方法似乎是使用 Greg 和 Elrond 在他们的回答中提到的重试。如果您使用 shared_task 或 @app.task 装饰器,则可以使用以下代码片段。

@shared_task(bind=True)
def your_task(self, certain_object_id):
    try:
        certain_obj = CertainObject.objects.get(id=certain_object_id)
        # Do your stuff
    except CertainObject.DoesNotExist as e:
        self.retry(exc=e, countdown=2 ** self.request.retries, max_retries=20)

我在每次重试之间使用了指数倒计时。大家可以根据自己的需要进行修改。

您可以找到自定义重试延迟的文档 here。 还有另一个文档解释了这个 link

中的指数退避

当您调用重试时,它会使用相同的任务 ID 发送一条新消息,并且会注意确保将消息传递到与原始任务相同的队列。您可以在文档 here

中阅读更多相关信息

既然写入然后立即加载它是一个高优先级,那么为什么不将它存储在基于内存的数据库中,例如 Memcache 或 Redis。因此,一段时间后,您可以使用 celery 中的定期作业将其写入数据库,这将 运行 让我们说每分钟左右。完成写入数据库后,它将从 Redis/Memcache.

中删除键

您可以将数据保存在基于内存的数据库中一定时间,比如说最需要数据的时候 1 小时。您还可以创建一个服务方法,它将检查数据是否在内存中。

Django Redis 是连接到 redis 的一个很好的包(如果你在 Celery 中使用它作为代理)。

我提供了一些基于 Django 缓存的示例:

# service method

from django.core.cache import cache

def get_object(obj_id, model_cls):
    obj_dict = cache.get(obj_id, None)  # checks if obj id is in cache, O(1) complexity
    if obj_dict:
       return model_cls(**obj_dict)
    else:
       return model_cls.objects.get(id=obj_id)


# celery job

@app.task
def store_objects():
    logger.info("-"*25)
    # you can use .bulk_create() to reduce DB hits and faster DB entries
    for obj_id in cache.keys("foo_*"):
        CertainObject.objects.create(**cache.get(obj_id))
        cache.delete(obj_id)
    logger.info("-"*25)