创建 IEnumerable<T> 的副本以修改来自不同线程的集合?

Create a copy of IEnumerable<T> to modify collection from different threads?

我正在使用线程方数据模型,该模型使用它的自定义数据模型。数据模型的层次结构如下:
型号
---Tables(Table 的类型)
-----行数(行的类型)
------细胞(细胞类型)

Table 有 属性 行,就像 DataTable 一样,我必须在多个任务中访问此 属性。现在我需要 table 中的一行,它的列值是指定值。

为此,我创建了一个方法,该方法具有锁定语句以使其只能从一个线程访问一次。

public static Row GetRowWithColumnValue(Model model, string tableKey, string indexColumnKey, string indexColumnValue)
{
    Row simObj = null;
    lock (syncRoot)
    {
        SimWrapperFromValueFactory wrapperSimSystem = new SimWrapperFromValueFactory(model, tableKey, indexColumnKey);
        simObj = wrapperSimSystem.GetWrapper(indexColumnValue);
    }
    return simObj;
}

为了为 Table 中的其中一列创建查找,我创建了一个方法,该方法始终尝试创建行的副本以避免集合修改异常:

Private Function GetTableRows(table As Table) As List(Of Row)
    Dim rowsList As New List(Of Row)(table.Rows)  'Case 1
    'rowsList.AddRange(table.Rows) 'Case 2
    ' Case 3
    'For i As Integer = 0 To table.Rows.Count - 1
    'rowsList.Add(table.Rows.ElementAt(i))
    'Next
    Return rowsList
End Function

但其他线程可以修改 table(例如添加、删除行或更新任何行中的列值)。我低于 "Collection modified exception":

at System.ThrowHelper.ThrowInvalidOperationException(ExceptionResource resource)
   at System.Collections.Generic.List`1.Enumerator.MoveNextRare()
   at System.Collections.Generic.List`1.InsertRange(Int32 index, IEnumerable`1 collection)

我无法将此第三方库修改为并发集合以及在多个项目之间共享的同一数据模型。

问题: 我正在寻找解决方案,让我允许多个读者阅读这个集合,或者它在另一个线程中修改。是否有可能在没有的情况下获取集合的副本出现异常??

参考了下面的 SO threads 但没有找到确切的解决方案:
Lock vs. ToArray for thread safe foreach access of List collection
Can ToArray() throw an exception?
Is returning an IEnumerable<> thread-safe?

最简单的解决方案是重试异常,如下所示:

private List<Row> CopyVolatileList(IEnumerable<Row> original)
{
    while (true)
    {
        try
        {
            List<Row> copy = new List<Row>();

            foreach (Row row in original) {
                copy.Add(row);
            }

            // Validate.
            if (copy.Count != 0 && copy[copy.Count - 1] == null) // Assuming Row is a reference type.
            {
                // At least one element was removed from the list while were copying.
                continue;
            }

            return copy;
        }
        catch (InvalidOperationException)
        {
            // Check ex.Message?
        }

        // Keep trying.
    }
}

最终你会得到一个运行,其中没有抛出异常并且数据完整性验证通过。

或者,您可以深入研究(我的意思是非常,非常 深)。

免责声明:永远不要在生产中使用它。除非你走投无路,真的别无选择。

因此我们确定您使用的是自定义 collection (TableRowCollection),它最终使用 List<Row>.Enumerator 遍历行。这强烈表明您的 collection 得到了 List<Row>.

的支持

首先,您需要获得对该列表的引用。您的 collection 不会公开它,因此您需要 fiddle 一点。您将需要使用反射来查找和获取支持列表的值。我建议在调试器中查看 TableRowCollection。它会向您显示 non-public 个成员,您将知道要反映什么。

如果找不到您的 List<Row>,请仔细查看 TableRowCollection.GetEnumerator() - 特别是 GetEnumerator().GetType()。如果 returns List<Row>.Enumerator,那么宾果游戏:我们可以从中获取支持列表,如下所示:

List<Row> list;

using (IEnumerator<Row> enumerator = table.GetEnumerator())
{
    list = (List<Row>)typeof(List<Row>.Enumerator)
        .GetField("list", BindingFlags.Instance | BindingFlags.NonPublic)
        .GetValue(enumerator);
}

如果上述获取您的 List<Row> 的方法均失败,则无需继续阅读。你还不如放弃。

万一你成功了,现在你有了支持 List<Row>,我们将不得不看看 Reference Source for List<T>

我们看到使用了 3 个字段:

private T[] _items;
private int _size; // Accessible via "Count".
private int _version;

我们的目标是将索引介于零和 _size - 1 之间的项目从 _items 数组复制到新数组中,并在 _version 更改之间执行此操作。

关于线程安全的观察:List<T> 不使用锁,none 字段标记为 volatile 并且 _version 通过 ++ 递增,而不是 Interlocked.Increment。长话短说,这意味着 不可能 读取所有 3 个字段值并自信地说我们正在查看稳定数据。我们将不得不重复读取字段值,以便 有点 确信我们正在查看一个合理的快照(我们永远不会 100% 有把握,但您可以选择解决"good enough").

using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading;

private Row[] CopyVolatileList(List<Row> original)
{
    while (true)
    {
        // Get _items and _size values which are safe to use in tandem.
        int version = GetVersion(original); // _version.
        Row[] items = GetItems(original); // _items.
        int count = original.Count; // _size.

        if (items.Length < count)
        {
            // Definitely a torn read. Copy will fail.
            continue;
        }

        // Copy.
        Row[] copy = new Row[count];

        Array.Copy(items, 0, copy, 0, count);

        // Stabilization window.
        Thread.Sleep(1);

        // Validate.
        if (version == GetVersion(original)) {
            return copy;
        }

        // Keep trying.
    }
}

static Func<List<Row>, int> GetVersion = CompilePrivateFieldAccessor<List<Row>, int>("_version");
static Func<List<Row>, Row[]> GetItems = CompilePrivateFieldAccessor<List<Row>, Row[]>("_items");

static Func<TObject, TField> CompilePrivateFieldAccessor<TObject, TField>(string fieldName)
{
    ParameterExpression param = Expression.Parameter(typeof(TObject), "o");
    MemberExpression fieldAccess = Expression.PropertyOrField(param, fieldName);

    return Expression
        .Lambda<Func<TObject, TField>>(fieldAccess, param)
        .Compile();
}

注意重新稳定 window:它越大,您越有信心处理的不是撕裂读取(因为列表正在修改所有 3 个字段)。我已经确定了我在测试中不能失败的最小值,我在一个线程上的一个紧密循环中调用 CopyVolatileList,并使用另一个线程将项目添加到列表中,删除它们或清除列表0 到 20 毫秒之间的随机间隔。

如果您删除稳定性 window,您偶尔会在数组末尾获得一个包含未初始化元素的副本,因为另一个线程在您复制时删除了一行 - 这就是需要它的原因。

显然,您应该在构建副本后尽最大努力对其进行验证(至少检查数组末尾的未初始化元素,以防稳定 window 失败)。

祝你好运。