django concurrent bulk_create 没有将所有值插入数据库

django concurrent bulk_create not inserting all values to database

我正在使用 django 的 bulk_create 将大量数据存储到 postgresql 数据库。这是在芹菜任务中发生的。根据不同的参数值立即调用此任务的许多实例。有 4 个不同的工作人员 运行 每个任务实例并行。 当只有一个任务实例 运行 时,所有值都被添加到数据库中,但是当多个实例通过不同的 worker 运行 连接在一起时,只有一些值被插入

在为 bulk_create 创建模型对象列表时,我添加了 print 以交叉检查值是否已插入到列表中。我发现该值总是添加到列表中,但在 bulk_create 之后,相同的值不会反映在数据库中。 我通过添加 time.sleep 来添加延迟和 bulk_create 中的 batch_size 参数,部分解决了这个问题。但是这个方案不太理想,数据变多了,也解决不了问题

我不能post确切的代码,但它是这样的: * 芹菜任务:

def some_function():
   param_lst=[1,3,..] #many values
   all_task=[]
   for i in param_lst:
      all_task.append(some_task1.si(i)) #some_task1 and some_task2 are using majorly similar code, only some data processing changes. both use bulk_create to store values
      all_task.append(some_task2.si(i))
   ch = chord( group( all_task )).set(queue="someQueue")(some_funct) #this is how the tasks are invoked

   if ch.ready():
       ch.get()

任务函数定义

@someapp.task
def some_task1(self,i):
  #process/modify data based on param i
  #we end up with json and iterate over it
  batch = []
  for k in someDict:
    result=MyModel(val1=something,...) #all values inside model
    batch.append(result)
  MyModel.objects.bulk_create(batch) #also tried with batch_size parameter

其他任务类似,只是数据修改有一些变化。它使用与 bulk_create 相同的方式来存储值

没有抛出任何错误消息。当我 运行 仅针对一个参数 bulk_create 存储所有值时,但当任务是 运行 并行参数列表时,它将错过向数据库插入一些值。 我在网上搜索时找不到这样的东西。 我对数据库概念不是很好,对芹菜也很陌生。如果我遗漏了什么或做错了什么,请告诉我

尝试在您的 bulk_create 周围使用交易。根据后端的不同,它可能不是事务性操作。

from django.db import transaction

@someapp.task
@transaction.atomic
def some_task1(self,i):
  #process/modify data based on param i
  #we end up with json and iterate over it
  batch = []
  for k in someDict:
    result=MyModel(val1=something,...) #all values inside model
    batch.append(result)
  MyModel.objects.bulk_create(batch) #also tried with batch_size parameter