Guid.NewGuid() 在 Parallel.For 循环中使用时返回重复值

Guid.NewGuid() is returning duplicate values when used in a Parallel.For loop

我有一个应用程序遇到了 API。因此,它会查询对象中的所有 ID,然后必须针对每个 ID 一次查询一个项目。我在 Parallel.For 循环中执行此操作,并将每个项目数据添加到数据 table 中的一行。然后我使用 sqlbulkcopy 将数据 table 发送到 SQL 服务器 table。

如果我在不使用 Parallel.For 的情况下执行此操作,效果会很好。但是,对于 Parallel.For,这一行:

workrow["id"] = Guid.NewGuid();

正在生成重复的 Guid。它经常这样做并导致数据无法加载到 SQL 服务器 table 中,因为 SQL 中的 id 行是主键并且不允许重复。我试过锁定:

                    lock (lockobject)
                    {
                        workrow["id"] = Guid.NewGuid();
                    }

这没有帮助。
我尝试不为该字段分配 id,希望 SQL 会生成它(它在该字段上确实有 newid() )。那没有说它不能插入空值。 我似乎不能只从数据中删除 id 字段table 因为当我执行 sqlbulkcopy 时列不对齐。

有人可以帮我吗?我要么需要弄清楚如何让 Guid.NewGuid() 停止生成重复项,要么我需要弄清楚一种不传递 id(始终是数据 table 中的第一个字段)的方法,以便SQL 将生成 id。

这是我用来生成 table 之一的代码:

        public static DataTable MakeWorkflowTable()
        {
            DataTable Workflow = new DataTable("Workflow");
            DataColumn id = new DataColumn("id", System.Type.GetType("System.Guid"));
            Workflow.Columns.Add(id);
            DataColumn OrgInfoID = new DataColumn("OrgInfoID", System.Type.GetType("System.Guid"));
            Workflow.Columns.Add(OrgInfoID);
            DataColumn Name = new DataColumn("Name", System.Type.GetType("System.String"));
            Workflow.Columns.Add(Name);
            DataColumn Active = new DataColumn("Active", System.Type.GetType("System.String"));
            Workflow.Columns.Add(Active);
            DataColumn Description = new DataColumn("Description", System.Type.GetType("System.String"));
            Workflow.Columns.Add(Description);
            DataColumn Object = new DataColumn("Object", System.Type.GetType("System.String"));
            Workflow.Columns.Add(Object);
            DataColumn Formula = new DataColumn("Formula", System.Type.GetType("System.String"));
            Workflow.Columns.Add(Formula);
            DataColumn ManageableState = new DataColumn("ManageableState", System.Type.GetType("System.String"));
            Workflow.Columns.Add(ManageableState);
            DataColumn NameSpacePrefix = new DataColumn("NameSpacePrefix", System.Type.GetType("System.String"));
            Workflow.Columns.Add(NameSpacePrefix);
            DataColumn TDACount = new DataColumn("TDACount", System.Type.GetType("System.Int32"));
            Workflow.Columns.Add(TDACount);
            DataColumn TriggerType = new DataColumn("TriggerType", System.Type.GetType("System.String"));
            Workflow.Columns.Add(TriggerType);
            DataColumn CreatedDate = new DataColumn("CreatedDate", System.Type.GetType("System.DateTime"));
            Workflow.Columns.Add(CreatedDate);
            DataColumn CreatedBy = new DataColumn("CreatedBy", System.Type.GetType("System.String"));
            Workflow.Columns.Add(CreatedBy);
            DataColumn LastModifiedDate = new DataColumn("LastModifiedDate", System.Type.GetType("System.DateTime"));
            Workflow.Columns.Add(LastModifiedDate);
            DataColumn LastModifiedBy = new DataColumn("LastModifiedBy", System.Type.GetType("System.String"));
            Workflow.Columns.Add(LastModifiedBy);
            return Workflow;
        }

这是我用来将它发送到 SQL 服务器的代码:

        public static void SendDTtoDB(ref DataTable dt, ref SqlConnection cnn, string TableName)
        {
            using (SqlBulkCopy bulkCopy = new SqlBulkCopy(cnn))
            {
                bulkCopy.DestinationTableName =
                    TableName;
                try
                {
                    bulkCopy.WriteToServer(dt);
                    dt.Clear();
                }
                catch (Exception e)
                {
                    logger.Warn("SendDTtoDB {TableName}: ORGID: {ORGID} : {Message}", TableName, dt.Rows[0]["OrgInfoID"], e.Message.ToString());
                    if (e.Message.ToString().Contains("PRIMARY KEY"))
                    {
                        foreach(DataRow row in dt.Rows)
                        {
                            logger.Warn("ID: {id}", row["id"]);
                        }
                    }
                }
            }

        }

正如您在 catch 语句中看到的那样,我将其设置为将 ID 写到日志中,这样我就可以自己查看它们,果然那里有一个重复项。太令人沮丧了!如果不需要的话,我真的不想取出 Parallel.For 和单线程。

根据请求,这里是带有 Parallel.For

的代码
              if (qr.totalSize > 0)
                {
                    object lockobject = new object();
                    Parallel.For(0, qr.records.Length, i =>
                    {
                        ToolingService.CustomTab1 vr = new ToolingService.CustomTab1();

                        vr = (ToolingService.CustomTab1)qr.records[i];
                        string mdSOQL = "Select FullName, description, ManageableState, MasterLabel, NamespacePrefix, Type, Url, CreatedDate, CreatedBy.Name, "
                            + "LastModifiedDate, LastModifiedBy.Name From CustomTab where id='" + vr.Id + "'";
                        ToolingService.QueryResult mdqr = new ToolingService.QueryResult();
                        ToolingService.CustomTab1 vrmd = new ToolingService.CustomTab1();
                        mdqr = ts.query(mdSOQL);
                        vrmd = (ToolingService.CustomTab1)mdqr.records[0];

                        DataRow workrow = CustomTabs.NewRow();
                        lock (lockobject)
                        {
                            workrow["id"] = Guid.NewGuid();
                        }
                        workrow["OrgInfoID"] = _orgDBID;
                        workrow["FullName"] = vrmd.FullName;
                        workrow["Description"] = vrmd.Description ?? Convert.DBNull;
                        workrow["ManageableState"] = vrmd.ManageableState;
                        workrow["MasterLabel"] = vrmd.MasterLabel ?? Convert.DBNull;
                        workrow["NameSpacePrefix"] = vrmd.NamespacePrefix ?? Convert.DBNull;
                        workrow["Type"] = vrmd.Type ?? Convert.DBNull;
                        workrow["URL"] = vrmd.Url ?? Convert.DBNull;
                        workrow["CreatedDate"] = vrmd.CreatedDate ?? Convert.DBNull;
                        if (vrmd.CreatedBy == null)
                        {
                            workrow["CreatedBy"] = Convert.DBNull;
                        }
                        else
                        {
                            workrow["CreatedBy"] = vrmd.CreatedBy.Name;
                        }
                        workrow["LastModifiedDate"] = vrmd.LastModifiedDate ?? Convert.DBNull;
                        if (vrmd.LastModifiedBy == null)
                        {
                            workrow["LastModifiedBy"] = Convert.DBNull;
                        }
                        else
                        {
                            workrow["LastModifiedBy"] = vrmd.LastModifiedBy.Name;
                        }
                        lock (CustomTabs)
                        {
                            CustomTabs.Rows.Add(workrow);
                        }

                    });
                    OrgTables.SendDTtoDB(ref CustomTabs, ref _cnn, "OrgCustomTabs");

我以前见过这个问题。 Guid.NewGuid() 没有问题,但是 DataTable 不是线程安全的!

DataTable is simply not designed or intended for concurrent usage (in particular where there is any form of mutation involved).

Thread safety for DataTable

还相关:c# DataGridView DataTable internal index corrupted in parallel loop

It's done in a loop because I have to query the API over and over to get each item in the object. I'm doing the Parallel.For loop so I can speed up the process. If I need to get 450 items or more one at a time, I'd like to multithread that for speed purposes. The database access isn't being done in the loop, just building the datatable is because once I get the data back from the API I need to store it.

您可以创建一个类型并将它们以多线程方式添加到 ConcurrentBag<T>ConcurrentQueue<T>(或另一个并发集合,请参阅 MS Docs)——它们是线程安全的 :)

然后您可以使用单线程构建 DataTable。或者,如果您的用例可能的话,可以跳过整个数据表。

问题在于,必须在 DataTable 的 Parallel.ForEach 中使用 lock,这首先违背了使用 Parallel.ForEach 的目的;但是,令我惊讶的是,当您调用 DataRow workrow = CustomTabs.NewRow(); 时您没有得到异常,因为在我的测试中,我得到了一个索引损坏的异常。实际上,我不得不将对 NewRow 的调用包装在一个锁中。像这样:

Parallel.ForEach(data, x =>
            {
                DataRow row = null;
                lock (lockRow)
                {
                    row = dt.NewRow();
                    row["Guid"] = Guid.NewGuid();
                }
...
                lock(lockObj)
                   dt.Rows.Add(row);

其中 lockObjlockRow 是实例化为

的 2 个单独的静态对象
static  object lockObj = new  object();
static  object lockRow = new object();

这对我有用,将 100 万行添加到 DataTable 并确保所有 Guid 都是唯一的。

综上所述,我强烈建议按照 Julian 的建议编写代码或创建一个实现 IDataReader 的 class(您可以将其与 SQLBulkCopy 一起使用)并使用上传数据那。

我使用了每次访问时都锁定的 guidgenerator。


lock (guidGenerator)
{
  entity.Id = guidGenerator.NewGuid();
}

对我来说效果很好。不过这是一种不同的方法。