如何为 bulk_create 缓存单个 Django REST API POST?
How to cache individual Django REST API POSTs for bulk_create?
我有一个 Django REST API 端点。它接收 JSON 有效负载,例如
{ "data" : [0,1,2,3] }
这在 views.py 函数中解码并生成一个新的数据库对象,如下所示(伪代码):
newobj = MyObj.(col0 = 0, col1= 1, col2 = 2, col3 = 3)
newobj.save()
在测试中,创建 x1000 newobjs 列表然后进行批量创建的速度提高了 20 倍:
Myobj.objects.bulk_create(newobjs, 1000)
所以,问题是当我们有 1000 个时,如何在 Django 中的某个地方保存单个 POST 以准备批量写入?
例如,您可以使用 Memcached 或 Redis 对其进行缓存。
但是您将需要编写某种服务来检查缓存中有多少新项目以及是否超过例如。 1000 -> 插入它。
所以:
- POST 正在填充缓存
- 服务从缓存中获取新项目,然后将它们插入到持久性数据库中。
你真的需要吗?
如果数据已经存在会怎样?如果数据损坏?用户如何知道这一点?
save individual POSTs somewhere in Django ready for batch writes when we have 1000 of them
可以,
- 使用django's cache framework,
- 使用 python 的 csv module
维护一个 CSV
文件
- 您可能希望保持 post 的顺序,因此您可以使用 persist-queue 包。
但是正如维克多也提到的,为什么?为什么您如此关心 SQL Insert
的速度,反正已经相当快了?
当然,bulk_create
比 快得多 ,因为它需要对您的数据库服务器进行一次网络调用,然后将所有行添加到一个 SQL transaction 中,但是只有当您实际上有一堆数据要加在一起时才有意义使用它。 - 最后,您必须将数据保存在某处,这将花费一些处理时间。
因为你的方法有很多缺点:
- 您有丢失数据的风险
- 您将无法达到
UNIQUE
或您的 table 的任何其他限制。
- 您的用户不会在创建 post 时获得即时反馈。
- 如果 post 未存储在您的主数据库中,您将无法 show/access 以有用的方式使用它们。
编辑
使用像 Redis 这样的快速缓存来维护条目列表,在你的 api_view
中你可以调用 cache.get
来获取当前列表,将对象附加到它然后调用 cache.set
更新它。在此之后添加一个检查,无论何时 len(list) >= 1000 == True
调用 bulk_create
。对于如此大量的数据,您可能还想考虑使用 Elasticsearch。
感谢以上回复,答案包括一些建议的内容,但是是一个超集,所以这里是一个摘要。
这实际上是关于创建 FIFO。 memcached turns out to be unsuitable (after trying) because only redis has a list function that enables this, explained nicely here.
另请注意Django内置缓存不支持redis列表api调用。
所以我们需要一个新的docker-compose.yml入口来添加redis:
redis:
image: redis
ports:
- 6379:6379/tcp
networks:
- app-network
然后在views.py我们添加:(注意redis的使用rpush)
import redis
...
redis_host=os.environ['REDIS_HOST']
redis_port = 6379
redis_password = ""
r = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_password, decode_responses=True)
...
def write_post_to_redis(request):
payload = json.loads(request.body)
r.rpush("df",json.dumps(payload))
所以这会将接收到的有效载荷推送到 redis 内存缓存中。我们现在需要读取(或弹出)它并写入 postgres 数据库。所以我们需要一个每n秒唤醒一次并检查的进程。为此,我们需要 Django background_task。首先,安装它:
pipenv install django-background-tasks
并添加到 settings.py
的已安装应用
INSTALLED_APPS = [
...
'background_task',
然后运行迁移添加后台任务表:
python manage.py migrate
现在在views.py,添加:
from background_task import background
from background_task.models import CompletedTask
并添加将缓存数据写入 postgres 数据库的函数,注意装饰器声明它应该每 5 秒在后台 运行。还要注意使用redis lpop.
@background(schedule=5)
def write_cached_samples():
...
payload = json.loads(r.lpop('df'))
# now do your write of payload to postgres
... and delete the completed tasks or we'll have a big db leak
CompletedTask.objects.all().delete()
为了启动进程,在 urls.py 的基础上添加以下内容:
write_cached_samples(repeat=10, repeat_until=None)
最后,因为后台任务需要一个单独的进程,所以我们在docker-compose.yml中复制了django docker容器,但将asgi服务器运行命令替换为后台进程 运行 命令。
django_bg:
image: my_django
command: >
sh -c "python manage.py process_tasks"
...
总而言之,我们添加了两个新的 docker 容器,一个用于 redis 内存缓存,另一个用于 运行 django 后台任务。我们使用 redis lists rpush 和 lpop 函数来使用 API 接收推送和后台任务弹出创建一个 FIFO。
有一个 nginx 连接到错误的 django 容器的小问题,通过停止并重新启动后台容器来纠正,一些 docker 网络路由初始化错误的问题。
接下来我将 Django HTTP API 端点替换为 Go 端点以查看我们获得了多少加速,因为 Daphne ASGI 服务器仅在 100 个请求时达到最大值 CPU每秒
我有一个 Django REST API 端点。它接收 JSON 有效负载,例如
{ "data" : [0,1,2,3] }
这在 views.py 函数中解码并生成一个新的数据库对象,如下所示(伪代码):
newobj = MyObj.(col0 = 0, col1= 1, col2 = 2, col3 = 3)
newobj.save()
在测试中,创建 x1000 newobjs 列表然后进行批量创建的速度提高了 20 倍:
Myobj.objects.bulk_create(newobjs, 1000)
所以,问题是当我们有 1000 个时,如何在 Django 中的某个地方保存单个 POST 以准备批量写入?
例如,您可以使用 Memcached 或 Redis 对其进行缓存。 但是您将需要编写某种服务来检查缓存中有多少新项目以及是否超过例如。 1000 -> 插入它。
所以:
- POST 正在填充缓存
- 服务从缓存中获取新项目,然后将它们插入到持久性数据库中。
你真的需要吗? 如果数据已经存在会怎样?如果数据损坏?用户如何知道这一点?
save individual POSTs somewhere in Django ready for batch writes when we have 1000 of them
可以,
- 使用django's cache framework,
- 使用 python 的 csv module 维护一个
- 您可能希望保持 post 的顺序,因此您可以使用 persist-queue 包。
CSV
文件
但是正如维克多也提到的,为什么?为什么您如此关心 SQL Insert
的速度,反正已经相当快了?
当然,bulk_create
比 快得多 ,因为它需要对您的数据库服务器进行一次网络调用,然后将所有行添加到一个 SQL transaction 中,但是只有当您实际上有一堆数据要加在一起时才有意义使用它。 - 最后,您必须将数据保存在某处,这将花费一些处理时间。
因为你的方法有很多缺点:
- 您有丢失数据的风险
- 您将无法达到
UNIQUE
或您的 table 的任何其他限制。 - 您的用户不会在创建 post 时获得即时反馈。
- 如果 post 未存储在您的主数据库中,您将无法 show/access 以有用的方式使用它们。
编辑
使用像 Redis 这样的快速缓存来维护条目列表,在你的 api_view
中你可以调用 cache.get
来获取当前列表,将对象附加到它然后调用 cache.set
更新它。在此之后添加一个检查,无论何时 len(list) >= 1000 == True
调用 bulk_create
。对于如此大量的数据,您可能还想考虑使用 Elasticsearch。
感谢以上回复,答案包括一些建议的内容,但是是一个超集,所以这里是一个摘要。
这实际上是关于创建 FIFO。 memcached turns out to be unsuitable (after trying) because only redis has a list function that enables this, explained nicely here.
另请注意Django内置缓存不支持redis列表api调用。
所以我们需要一个新的docker-compose.yml入口来添加redis:
redis:
image: redis
ports:
- 6379:6379/tcp
networks:
- app-network
然后在views.py我们添加:(注意redis的使用rpush)
import redis
...
redis_host=os.environ['REDIS_HOST']
redis_port = 6379
redis_password = ""
r = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_password, decode_responses=True)
...
def write_post_to_redis(request):
payload = json.loads(request.body)
r.rpush("df",json.dumps(payload))
所以这会将接收到的有效载荷推送到 redis 内存缓存中。我们现在需要读取(或弹出)它并写入 postgres 数据库。所以我们需要一个每n秒唤醒一次并检查的进程。为此,我们需要 Django background_task。首先,安装它:
pipenv install django-background-tasks
并添加到 settings.py
的已安装应用INSTALLED_APPS = [
...
'background_task',
然后运行迁移添加后台任务表:
python manage.py migrate
现在在views.py,添加:
from background_task import background
from background_task.models import CompletedTask
并添加将缓存数据写入 postgres 数据库的函数,注意装饰器声明它应该每 5 秒在后台 运行。还要注意使用redis lpop.
@background(schedule=5)
def write_cached_samples():
...
payload = json.loads(r.lpop('df'))
# now do your write of payload to postgres
... and delete the completed tasks or we'll have a big db leak
CompletedTask.objects.all().delete()
为了启动进程,在 urls.py 的基础上添加以下内容:
write_cached_samples(repeat=10, repeat_until=None)
最后,因为后台任务需要一个单独的进程,所以我们在docker-compose.yml中复制了django docker容器,但将asgi服务器运行命令替换为后台进程 运行 命令。
django_bg:
image: my_django
command: >
sh -c "python manage.py process_tasks"
...
总而言之,我们添加了两个新的 docker 容器,一个用于 redis 内存缓存,另一个用于 运行 django 后台任务。我们使用 redis lists rpush 和 lpop 函数来使用 API 接收推送和后台任务弹出创建一个 FIFO。
有一个 nginx 连接到错误的 django 容器的小问题,通过停止并重新启动后台容器来纠正,一些 docker 网络路由初始化错误的问题。
接下来我将 Django HTTP API 端点替换为 Go 端点以查看我们获得了多少加速,因为 Daphne ASGI 服务器仅在 100 个请求时达到最大值 CPU每秒