List<List<T>> 作为参数并行发送到 Func 并且 Func 执行多次导致一些列表被复制而其他列表被跳过

List<List<T>> sent to Func as parameter in parallel and Func executed multiple times results in some of the lists being duplicated and others skipped

我们有一个方法 Execute 可以像这样并行调用

    ListOfLists.Select(ids => Observable.FromAsync(() => Execute(request, ids))).Merge(10);

我们有

    protected Task<string> Execute(HttpRequestType request, IEnumerable<TItem> ids)
    {
        return SomeFunction(() => CreateRequest(request, ids.ToList()));
    }

CreateRequest(request, ids.ToList()) returns a HttpRequestMessage and SomeFunction takes a Func<HttpRequestMessage>.

    private async Task<T> SomeFunction(Func<HttpRequestMessage> func)
    {
        var request = func();
        var retryCount = 0;
        T result = null;
        for (; retryCount < MaxRetries; retryCount++)
        {
            try
            {
                result = DoSomethingWithRequest(request);
                if(result != null) break;
            }
            catch
            {
                //log here     
            }
            finally
            {
                request = func();    
            }
        }
        return result;
    }

现在我们已经看到,当我们在 ListOfLists 中有超过 10 个列表时(所以只有 10 个在任何时候执行,其余的等待)并且 DoSomethingWithRequest 间歇性地失败了几次,一些对 SomeFunction 的调用被重复,一些 ID 列表被删除。上面的代码中有什么东西导致了这个吗?

请原谅not-so-descriptive标题。

谢谢, 席德.

编辑:

    private HttpRequestMessage CreateRequest(HttpRequestType request, List<string> ids)
    {
        if (request == null) return null;
        request.SomeProperty = toList;
        return ConvertoToHttpRequestMessage(request); //This just does some serialization and adds a fresh request Id and headers
    }

我没看到你从哪里开始并行执行。

如果您使用 System.Parallel,您可以发送 ParallelOptions。这是 Parallel.ForEach()

的示例调用
List<string> myList = new List<string>();

Parallel.ForEach(
    myList,
    new ParallelOptions()
    {
        MaxDegreeOfParallelism = 1337 // Here we allow 1337 parallel executions
    },
    (i) => { /* do something */ });

您传递给 Execute 函数的 request 实例似乎存在竞争条件。

// The request instance must be created before this line, and you're passing the same
// instance to each call of Execute.
ListOfLists.Select(ids => Observable.FromAsync(() => Execute(request, ids))).Merge(10);

CreateRequest 不是在创建 HttpRequestType 的新实例,它只是修改传递给 Execute 的实例。由于每个线程 Execute 都在 HttpRequestType 的同一个实例上运行,因此它们只是相互覆盖。

因此,可能会发生以下情况:

Thread A启动,ids参数等于[1, 2, 3]Thread A 输入 SomeFunction 并调用 func。为 Thread A 捕获的 toList 参数是 [1, 2, 3],因此 request.SomeProperty 设置为 [1, 2, 3],然后使用 [1, 2, 3] 创建 HttpRequest在 header 的某个地方然后返回到 SomeFunctionDoSomethingWithRequestFails 对于 Thread A

同时,Thread B已经开始。 ids 参数等于 [4, 5, 6]Thread B 输入 SomeFunction 并调用 funcThread Brequest.SomeProperty 设置为 [4, 5, 6],然后调用 ConvertToHttpRequestMessage

现在,在 Thread B 有机会创建 HttpRequest 之前,Thread A(失败)进入 SomeFunction 中的 finally 块并调用func 再次。 Thread Arequest.SomeProperty 设置回 [1, 2, 3] 并且由于 Thread AThread B 都在变异同一个 HttpRequestType 实例,Thread B 现在有[1, 2, 3] 也在 request.SomeProperty 中。

Thread AThread B 都在 header 中创建了一个 HttpRequest[1, 2, 3]。 ID 列表 [1, 2, 3] 重复,列表 [4, 5, 6] 从未发送。

尝试将 toListCreateRequest 传递给 ConvertToHttpRequestMessage 而不仅仅是 HttpRequestType,或者为每次调用创建一个新的 HttpRequestType 实例Execute