django,多数据库(writer,read-reploicas)和同步问题
django, multi-databases (writer, read-reploicas) and a sync issue
所以...作为对 API 电话的回应,我这样做:
i = CertainObject(paramA=1, paramB=2)
i.save()
现在我的作家数据库有一条新记录。
处理可能需要一些时间,我不想推迟对 API 调用者的响应,所以下一行我将使用 Celery 将对象 ID 传输到异步作业:
run_async_job.delay(i.id)
立即,或几秒钟后,具体取决于队列 run_async_job
尝试使用提供的 ID 从数据库中加载记录。这是一场赌博。有时有效,有时无效取决于只读副本是否更新。
是否有模式可以保证成功而不必在阅读前“睡”几秒钟或希望好运?
谢谢。
本文深入探讨了如何处理复制数据库的写后读问题:https://medium.com/box-tech-blog/how-we-learned-to-stop-worrying-and-read-from-replicas-58cc43973638。
和作者一样,我知道没有万无一失的方法来处理先写后读的不一致性。
我之前使用的主要策略是使用某种 expect_and_get(pk, max_attempts=10, delay_seconds=5)
方法尝试获取记录,并尝试 max_attempts
次,延迟 delay_seconds
秒尝试之间。这个想法是它“期望”记录存在,因此它将一定数量的故障视为只是暂时的数据库问题。它比只休眠一段时间更可靠,因为它会更快地获取记录,并希望延迟作业执行的频率大大降低。
另一种策略是通过特殊的 save_to_read
方法延迟 returning 直到只读副本具有值,通过以某种方式将新值同步推送到只读副本或仅轮询它们直到他们 return 记录。这种方式似乎有点黑客 IMO。
对于您的大量读取,您可能不必担心写入后读取的一致性:
If we’re rendering the name of the enterprise a user is part of, it’s really not that big a deal if in the incredibly rare occasion that an admin changes it, it takes a minute to have the change propagate to the enterprise’s users.
最简单的解决方案是捕获任务开始时抛出的任何 DoesNotExist
错误,然后安排重试。这可以通过将 run_async_job
转换为 Bound Task
:
来完成
@app.task(bind=True)
def run_async_job(self, object_id):
try:
instance = CertainObject.objects.get(id=object_id)
except CertainObject.DoesNotExist:
return self.retry(object_id)
最简单的方法似乎是使用 Greg 和 Elrond 在他们的回答中提到的重试。如果您使用 shared_task 或 @app.task 装饰器,则可以使用以下代码片段。
@shared_task(bind=True)
def your_task(self, certain_object_id):
try:
certain_obj = CertainObject.objects.get(id=certain_object_id)
# Do your stuff
except CertainObject.DoesNotExist as e:
self.retry(exc=e, countdown=2 ** self.request.retries, max_retries=20)
我在每次重试之间使用了指数倒计时。大家可以根据自己的需要进行修改。
您可以找到自定义重试延迟的文档 here。
还有另一个文档解释了这个 link
中的指数退避
当您调用重试时,它会使用相同的任务 ID 发送一条新消息,并且会注意确保将消息传递到与原始任务相同的队列。您可以在文档 here
中阅读更多相关信息
既然写入然后立即加载它是一个高优先级,那么为什么不将它存储在基于内存的数据库中,例如 Memcache 或 Redis。因此,一段时间后,您可以使用 celery 中的定期作业将其写入数据库,这将 运行 让我们说每分钟左右。完成写入数据库后,它将从 Redis/Memcache.
中删除键
您可以将数据保存在基于内存的数据库中一定时间,比如说最需要数据的时候 1 小时。您还可以创建一个服务方法,它将检查数据是否在内存中。
Django Redis 是连接到 redis 的一个很好的包(如果你在 Celery 中使用它作为代理)。
我提供了一些基于 Django 缓存的示例:
# service method
from django.core.cache import cache
def get_object(obj_id, model_cls):
obj_dict = cache.get(obj_id, None) # checks if obj id is in cache, O(1) complexity
if obj_dict:
return model_cls(**obj_dict)
else:
return model_cls.objects.get(id=obj_id)
# celery job
@app.task
def store_objects():
logger.info("-"*25)
# you can use .bulk_create() to reduce DB hits and faster DB entries
for obj_id in cache.keys("foo_*"):
CertainObject.objects.create(**cache.get(obj_id))
cache.delete(obj_id)
logger.info("-"*25)
所以...作为对 API 电话的回应,我这样做:
i = CertainObject(paramA=1, paramB=2)
i.save()
现在我的作家数据库有一条新记录。
处理可能需要一些时间,我不想推迟对 API 调用者的响应,所以下一行我将使用 Celery 将对象 ID 传输到异步作业:
run_async_job.delay(i.id)
立即,或几秒钟后,具体取决于队列 run_async_job
尝试使用提供的 ID 从数据库中加载记录。这是一场赌博。有时有效,有时无效取决于只读副本是否更新。
是否有模式可以保证成功而不必在阅读前“睡”几秒钟或希望好运?
谢谢。
本文深入探讨了如何处理复制数据库的写后读问题:https://medium.com/box-tech-blog/how-we-learned-to-stop-worrying-and-read-from-replicas-58cc43973638。
和作者一样,我知道没有万无一失的方法来处理先写后读的不一致性。
我之前使用的主要策略是使用某种 expect_and_get(pk, max_attempts=10, delay_seconds=5)
方法尝试获取记录,并尝试 max_attempts
次,延迟 delay_seconds
秒尝试之间。这个想法是它“期望”记录存在,因此它将一定数量的故障视为只是暂时的数据库问题。它比只休眠一段时间更可靠,因为它会更快地获取记录,并希望延迟作业执行的频率大大降低。
另一种策略是通过特殊的 save_to_read
方法延迟 returning 直到只读副本具有值,通过以某种方式将新值同步推送到只读副本或仅轮询它们直到他们 return 记录。这种方式似乎有点黑客 IMO。
对于您的大量读取,您可能不必担心写入后读取的一致性:
If we’re rendering the name of the enterprise a user is part of, it’s really not that big a deal if in the incredibly rare occasion that an admin changes it, it takes a minute to have the change propagate to the enterprise’s users.
最简单的解决方案是捕获任务开始时抛出的任何 DoesNotExist
错误,然后安排重试。这可以通过将 run_async_job
转换为 Bound Task
:
@app.task(bind=True)
def run_async_job(self, object_id):
try:
instance = CertainObject.objects.get(id=object_id)
except CertainObject.DoesNotExist:
return self.retry(object_id)
最简单的方法似乎是使用 Greg 和 Elrond 在他们的回答中提到的重试。如果您使用 shared_task 或 @app.task 装饰器,则可以使用以下代码片段。
@shared_task(bind=True)
def your_task(self, certain_object_id):
try:
certain_obj = CertainObject.objects.get(id=certain_object_id)
# Do your stuff
except CertainObject.DoesNotExist as e:
self.retry(exc=e, countdown=2 ** self.request.retries, max_retries=20)
我在每次重试之间使用了指数倒计时。大家可以根据自己的需要进行修改。
您可以找到自定义重试延迟的文档 here。 还有另一个文档解释了这个 link
中的指数退避当您调用重试时,它会使用相同的任务 ID 发送一条新消息,并且会注意确保将消息传递到与原始任务相同的队列。您可以在文档 here
中阅读更多相关信息既然写入然后立即加载它是一个高优先级,那么为什么不将它存储在基于内存的数据库中,例如 Memcache 或 Redis。因此,一段时间后,您可以使用 celery 中的定期作业将其写入数据库,这将 运行 让我们说每分钟左右。完成写入数据库后,它将从 Redis/Memcache.
中删除键您可以将数据保存在基于内存的数据库中一定时间,比如说最需要数据的时候 1 小时。您还可以创建一个服务方法,它将检查数据是否在内存中。
Django Redis 是连接到 redis 的一个很好的包(如果你在 Celery 中使用它作为代理)。
我提供了一些基于 Django 缓存的示例:
# service method
from django.core.cache import cache
def get_object(obj_id, model_cls):
obj_dict = cache.get(obj_id, None) # checks if obj id is in cache, O(1) complexity
if obj_dict:
return model_cls(**obj_dict)
else:
return model_cls.objects.get(id=obj_id)
# celery job
@app.task
def store_objects():
logger.info("-"*25)
# you can use .bulk_create() to reduce DB hits and faster DB entries
for obj_id in cache.keys("foo_*"):
CertainObject.objects.create(**cache.get(obj_id))
cache.delete(obj_id)
logger.info("-"*25)