List<List<T>> 作为参数并行发送到 Func 并且 Func 执行多次导致一些列表被复制而其他列表被跳过
List<List<T>> sent to Func as parameter in parallel and Func executed multiple times results in some of the lists being duplicated and others skipped
我们有一个方法 Execute
可以像这样并行调用
ListOfLists.Select(ids => Observable.FromAsync(() => Execute(request, ids))).Merge(10);
我们有
protected Task<string> Execute(HttpRequestType request, IEnumerable<TItem> ids)
{
return SomeFunction(() => CreateRequest(request, ids.ToList()));
}
CreateRequest(request, ids.ToList()) returns a HttpRequestMessage
and SomeFunction
takes a Func<HttpRequestMessage>
.
和
private async Task<T> SomeFunction(Func<HttpRequestMessage> func)
{
var request = func();
var retryCount = 0;
T result = null;
for (; retryCount < MaxRetries; retryCount++)
{
try
{
result = DoSomethingWithRequest(request);
if(result != null) break;
}
catch
{
//log here
}
finally
{
request = func();
}
}
return result;
}
现在我们已经看到,当我们在 ListOfLists
中有超过 10 个列表时(所以只有 10 个在任何时候执行,其余的等待)并且 DoSomethingWithRequest
间歇性地失败了几次,一些对 SomeFunction
的调用被重复,一些 ID 列表被删除。上面的代码中有什么东西导致了这个吗?
请原谅not-so-descriptive标题。
谢谢,
席德.
编辑:
private HttpRequestMessage CreateRequest(HttpRequestType request, List<string> ids)
{
if (request == null) return null;
request.SomeProperty = toList;
return ConvertoToHttpRequestMessage(request); //This just does some serialization and adds a fresh request Id and headers
}
我没看到你从哪里开始并行执行。
如果您使用 System.Parallel
,您可以发送 ParallelOptions
。这是 Parallel.ForEach()
的示例调用
List<string> myList = new List<string>();
Parallel.ForEach(
myList,
new ParallelOptions()
{
MaxDegreeOfParallelism = 1337 // Here we allow 1337 parallel executions
},
(i) => { /* do something */ });
您传递给 Execute
函数的 request
实例似乎存在竞争条件。
// The request instance must be created before this line, and you're passing the same
// instance to each call of Execute.
ListOfLists.Select(ids => Observable.FromAsync(() => Execute(request, ids))).Merge(10);
CreateRequest
不是在创建 HttpRequestType
的新实例,它只是修改传递给 Execute
的实例。由于每个线程 Execute
都在 HttpRequestType
的同一个实例上运行,因此它们只是相互覆盖。
因此,可能会发生以下情况:
Thread A
启动,ids
参数等于[1, 2, 3]
。 Thread A
输入 SomeFunction
并调用 func
。为 Thread A
捕获的 toList
参数是 [1, 2, 3]
,因此 request.SomeProperty
设置为 [1, 2, 3]
,然后使用 [1, 2, 3]
创建 HttpRequest
在 header 的某个地方然后返回到 SomeFunction
。 DoSomethingWithRequestFails
对于 Thread A
。
同时,Thread B
已经开始。 ids
参数等于 [4, 5, 6]
。 Thread B
输入 SomeFunction
并调用 func
。 Thread B
将 request.SomeProperty
设置为 [4, 5, 6]
,然后调用 ConvertToHttpRequestMessage
。
现在,在 Thread B
有机会创建 HttpRequest
之前,Thread A
(失败)进入 SomeFunction
中的 finally
块并调用func
再次。 Thread A
将 request.SomeProperty
设置回 [1, 2, 3]
并且由于 Thread A
和 Thread B
都在变异同一个 HttpRequestType
实例,Thread B
现在有[1, 2, 3]
也在 request.SomeProperty
中。
Thread A
和 Thread B
都在 header 中创建了一个 HttpRequest
和 [1, 2, 3]
。 ID 列表 [1, 2, 3]
重复,列表 [4, 5, 6]
从未发送。
尝试将 toList
从 CreateRequest
传递给 ConvertToHttpRequestMessage
而不仅仅是 HttpRequestType
,或者为每次调用创建一个新的 HttpRequestType
实例Execute
。
我们有一个方法 Execute
可以像这样并行调用
ListOfLists.Select(ids => Observable.FromAsync(() => Execute(request, ids))).Merge(10);
我们有
protected Task<string> Execute(HttpRequestType request, IEnumerable<TItem> ids)
{
return SomeFunction(() => CreateRequest(request, ids.ToList()));
}
CreateRequest(request, ids.ToList()) returns a HttpRequestMessage
and SomeFunction
takes a Func<HttpRequestMessage>
.
和
private async Task<T> SomeFunction(Func<HttpRequestMessage> func)
{
var request = func();
var retryCount = 0;
T result = null;
for (; retryCount < MaxRetries; retryCount++)
{
try
{
result = DoSomethingWithRequest(request);
if(result != null) break;
}
catch
{
//log here
}
finally
{
request = func();
}
}
return result;
}
现在我们已经看到,当我们在 ListOfLists
中有超过 10 个列表时(所以只有 10 个在任何时候执行,其余的等待)并且 DoSomethingWithRequest
间歇性地失败了几次,一些对 SomeFunction
的调用被重复,一些 ID 列表被删除。上面的代码中有什么东西导致了这个吗?
请原谅not-so-descriptive标题。
谢谢, 席德.
编辑:
private HttpRequestMessage CreateRequest(HttpRequestType request, List<string> ids)
{
if (request == null) return null;
request.SomeProperty = toList;
return ConvertoToHttpRequestMessage(request); //This just does some serialization and adds a fresh request Id and headers
}
我没看到你从哪里开始并行执行。
如果您使用 System.Parallel
,您可以发送 ParallelOptions
。这是 Parallel.ForEach()
List<string> myList = new List<string>();
Parallel.ForEach(
myList,
new ParallelOptions()
{
MaxDegreeOfParallelism = 1337 // Here we allow 1337 parallel executions
},
(i) => { /* do something */ });
您传递给 Execute
函数的 request
实例似乎存在竞争条件。
// The request instance must be created before this line, and you're passing the same
// instance to each call of Execute.
ListOfLists.Select(ids => Observable.FromAsync(() => Execute(request, ids))).Merge(10);
CreateRequest
不是在创建 HttpRequestType
的新实例,它只是修改传递给 Execute
的实例。由于每个线程 Execute
都在 HttpRequestType
的同一个实例上运行,因此它们只是相互覆盖。
因此,可能会发生以下情况:
Thread A
启动,ids
参数等于[1, 2, 3]
。 Thread A
输入 SomeFunction
并调用 func
。为 Thread A
捕获的 toList
参数是 [1, 2, 3]
,因此 request.SomeProperty
设置为 [1, 2, 3]
,然后使用 [1, 2, 3]
创建 HttpRequest
在 header 的某个地方然后返回到 SomeFunction
。 DoSomethingWithRequestFails
对于 Thread A
。
同时,Thread B
已经开始。 ids
参数等于 [4, 5, 6]
。 Thread B
输入 SomeFunction
并调用 func
。 Thread B
将 request.SomeProperty
设置为 [4, 5, 6]
,然后调用 ConvertToHttpRequestMessage
。
现在,在 Thread B
有机会创建 HttpRequest
之前,Thread A
(失败)进入 SomeFunction
中的 finally
块并调用func
再次。 Thread A
将 request.SomeProperty
设置回 [1, 2, 3]
并且由于 Thread A
和 Thread B
都在变异同一个 HttpRequestType
实例,Thread B
现在有[1, 2, 3]
也在 request.SomeProperty
中。
Thread A
和 Thread B
都在 header 中创建了一个 HttpRequest
和 [1, 2, 3]
。 ID 列表 [1, 2, 3]
重复,列表 [4, 5, 6]
从未发送。
尝试将 toList
从 CreateRequest
传递给 ConvertToHttpRequestMessage
而不仅仅是 HttpRequestType
,或者为每次调用创建一个新的 HttpRequestType
实例Execute
。