如何修复 TSparseArray<T>?
How to fix TSparseArray<T>?
由于 System.Generics.Collections.TArray.Copy<T>
(取决于 already reported bug in System.CopyArray
)的未修复错误,有时会在使用线程库时引发异常。
异常在方法 System.Threading.TSparseArray<T>.Add
:
中引发
function TSparseArray<T>.Add(const Item: T): Integer;
var
I: Integer;
LArray, NewArray: TArray<T>;
begin
...
TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here
...
end;
好吧,System.CopyArray
中的错误是预料之中的。所以当试图解决这个问题时,我的第一个想法是简单地复制数组:
// TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here
for LIdx := Low( LArray ) to High( LArray ) do
NewArray[LIdx] := LArray[LIdx];
很有魅力。但是在那之后我想知道为什么需要数组副本:
LArray := FArray; // copy array reference from field
...
SetLength(NewArray, Length(LArray) * 2);
TArray.Copy<T>(LArray, NewArray, I + 1);
NewArray[I + 1] := Item;
Exit(I + 1);
元素被复制到NewArray
(局部变量),仅此而已。没有返回到 FArray
的赋值,所以对我来说 NewArray
将在超出范围时完成。
现在我有三个错误修正选择:
只需替换TArray.Copy
SetLength(NewArray, Length(LArray) * 2);
// TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here
for LIdx := Low( LArray ) to High( LArray ) do
NewArray[LIdx] := LArray[LIdx];
NewArray[I + 1] := Item;
Exit(I + 1);
替换TArray.Copy
并保存NewArray
SetLength(NewArray, Length(LArray) * 2);
// TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here
for LIdx := Low( LArray ) to High( LArray ) do
NewArray[LIdx] := LArray[LIdx];
NewArray[I + 1] := Item;
FArray := NewArray;
Exit(I + 1);
注释掉所有不必要的代码部分(因为它们只是在浪费时间)
// SetLength(NewArray, Length(LArray) * 2);
// TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here
// NewArray[I + 1] := Item;
Exit(I + 1);
我用一堆任务检查了所有三个修复程序,寻找未使用的工作线程或未执行的任务。但我没有找到他们中的任何一个。该库按预期工作(现在没有任何异常)。
你能指出我在这里遗漏了什么吗?
为了得到这个异常,您运行了一堆任务并让 TTaskPool
创建了越来越多的 TWorkerQueueThreads
。通过 TaskManager 检查线程数并在 TSparseArray<T>.Add
方法中的 TArray.Copy
行使用断点。当应用程序的线程数超过 25 个线程时,我在这里得到这个异常。
// Hit the button very fast until the debugger stops
// at TSparseArray<T>.Add method to copy the array
procedure TForm1.Button1Click( Sender : TObject );
var
LIdx : Integer;
begin
for LIdx := 1 to 20 do
TTask.Run(
procedure
begin
Sleep( 50 );
end );
end;
这不是 System.CopyArray
中的错误。按照设计,它只支持托管类型。该错误实际上在 TArray.Copy<T>
中。在不区分 T
是否为托管类型的情况下调用 System.CopyArray
是错误的。
但是,来自 XE7 update 1 的最新版本 TArray.Copy<T>
似乎没有遇到您描述的问题。代码如下所示:
class procedure TArray.Copy<T>(const Source, Destination: array of T;
SourceIndex, DestIndex, Count: NativeInt);
begin
CheckArrays(Pointer(@Source[0]), Pointer(@Destination[0]), SourceIndex,
Length(Source), DestIndex, Length(Destination), Count);
if IsManagedType(T) then
System.CopyArray(Pointer(@Destination[SourceIndex]),
Pointer(@Source[SourceIndex]), TypeInfo(T), Count)
else
System.Move(Pointer(@Destination[SourceIndex])^, Pointer(@Source[SourceIndex])^,
Count * SizeOf(T));
end;
除非我的分析有误,否则您只需应用更新 1 即可解决 System.CopyArray
的问题。
但正如 Uwe 在下面的评论中指出的那样,这段代码仍然是伪造的。它在应该使用 DestIndex
的地方错误地使用了 SourceIndex
。源参数和目标参数的传递顺序错误。人们还想知道为什么作者写 Pointer(@Destination[SourceIndex])^
而不是 Destination[SourceIndex]
。我发现整个情况非常令人沮丧。 Embarcadero 怎么能发布质量如此骇人听闻的代码?
比上述更深层次的问题是 TSparseArray<T>
。看起来像这样:
function TSparseArray<T>.Add(const Item: T): Integer;
var
I: Integer;
LArray, NewArray: TArray<T>;
begin
while True do
begin
LArray := FArray;
TMonitor.Enter(FLock);
try
for I := 0 to Length(LArray) - 1 do
begin
if LArray[I] = nil then
begin
FArray[I] := Item;
Exit(I);
end else if I = Length(LArray) - 1 then
begin
if LArray <> FArray then
Continue;
SetLength(NewArray, Length(LArray) * 2);
TArray.Copy<T>(LArray, NewArray, I + 1);
NewArray[I + 1] := Item;
Exit(I + 1);
end;
end;
finally
TMonitor.Exit(FLock);
end;
end;
end;
唯一一次初始化 FArray
是在 TSparseArray<T>
构造函数中。这意味着如果数组变满,则会添加和丢失项目。据推测 I = Length(LArray) - 1
是为了扩展 FArray
的长度并捕获新项目。但是,还要注意 TSparseArray<T>
通过 Current
属性 暴露了 FArray
。而且这种暴露是不受锁保护的。所以,一旦 FArray
变满,我就看不出这个 class 有什么用处。
我建议您构建一个示例,其中 FArray
变满,以证明添加的项目丢失了。提交一个错误报告来证明这一点,并链接到这个问题。
项目是否写入TSparseArray<T>
并不重要,因为只有当一个工作线程已经完成了所有委托给他的任务而另一个工作线程还没有完成时才需要它。此时空闲线程正在查看池中其他线程的队列并尝试窃取一些工作。
如果任何队列未进入此数组,则空闲线程不可见,因此无法分担工作负载。
为了解决这个问题,我选择了选项 2
function TSparseArray<T>.Add(const Item: T): Integer;
...
SetLength(NewArray, Length(LArray) * 2);
TArray.Copy<T>(LArray, NewArray, I + 1); // <- No Exception here with XE7U1
NewArray[I + 1] := Item;
{$IFDEF USE_BUGFIX}
FArray := NewArray;
{$ENDIF}
Exit(I + 1);
但是在没有任何锁定的情况下实施窃取部分是有风险的
procedure TThreadPool.TQueueWorkerThread.Execute;
...
if Signaled then
begin
I := 0;
while I < Length(ThreadPool.FQueues.Current) do
begin
if (ThreadPool.FQueues.Current[I] <> nil)
and (ThreadPool.FQueues.Current[I] <> WorkQueue)
and ThreadPool.FQueues.Current[I].TrySteal(Item)
then
Break;
Inc(I);
end;
if I <> Length(ThreadPool.FQueues.Current) then
Break;
LookedForSteals := True;
end
数组长度只会增加
while I < Length(ThreadPool.FQueues.Current) do
和
if I <> Length(ThreadPool.FQueues.Current) then
应该够安全了。
if Signaled then
begin
I := 0;
while I < Length(ThreadPool.FQueues.Current) do
begin
{$IFDEF USE_BUGFIX}
TMonitor.Enter(ThreadPool.FQueues);
try
{$ENDIF}
if (ThreadPool.FQueues.Current[I] <> nil) and (ThreadPool.FQueues.Current[I] <> WorkQueue) and ThreadPool.FQueues.Current[I].TrySteal(Item) then
Break;
{$IFDEF USE_BUGFIX}
finally
TMonitor.Exit(ThreadPool.FQueues);
end;
{$ENDIF}
Inc(I);
end;
if I <> Length(ThreadPool.FQueues.Current) then
Break;
LookedForSteals := True;
end
现在我们需要一个测试环境来观看偷窃:
program WatchStealingTasks;
{$APPTYPE CONSOLE}
{$R *.res}
uses
Winapi.Windows,
System.SysUtils,
System.Threading,
System.Classes,
System.Math;
procedure OutputDebugStr( const AStr: string ); overload;
begin
OutputDebugString( PChar( AStr ) );
end;
procedure OutputDebugStr( const AFormat: string; const AParams: array of const ); overload;
begin
OutputDebugStr( Format( AFormat, AParams ) );
end;
function CreateInnerTask( AThreadId: Cardinal; AValue: Integer; APool: TThreadPool ): ITask;
begin
Result := TTask.Run(
procedure
begin
Sleep( AValue );
if AThreadId <> TThread.CurrentThread.ThreadID
then
OutputDebugStr( '[%d] executed stolen task from [%d]', [TThread.CurrentThread.ThreadID, AThreadId] )
else
OutputDebugStr( '[%d] executed task', [TThread.CurrentThread.ThreadID] );
end, APool );
end;
function CreateTask( AValue: Integer; APool: TThreadPool ): ITask;
begin
Result := TTask.Run(
procedure
var
LIdx: Integer;
LTasks: TArray<ITask>;
begin
// Create three inner tasks per task
SetLength( LTasks, 3 );
for LIdx := Low( LTasks ) to High( LTasks ) do
begin
LTasks[LIdx] := CreateInnerTask( TThread.CurrentThread.ThreadID, AValue, APool );
end;
OutputDebugStr( '[%d] waiting for tasks completion', [TThread.CurrentThread.ThreadID] );
TTask.WaitForAll( LTasks );
OutputDebugStr( '[%d] task finished', [TThread.CurrentThread.ThreadID] );
end, APool );
end;
procedure Test;
var
LPool: TThreadPool;
LIdx: Integer;
LTasks: TArray<ITask>;
begin
OutputDebugStr( 'Test started' );
try
LPool := TThreadPool.Create;
try
// Create three tasks
SetLength( LTasks, 3 );
for LIdx := Low( LTasks ) to High( LTasks ) do
begin
// Let's put some heavy work (200ms) on the first tasks shoulder
// and the other tasks just some light work (20ms) to do
LTasks[LIdx] := CreateTask( IfThen( LIdx = 0, 200, 20 ), LPool );
end;
TTask.WaitForAll( LTasks );
finally
LPool.Free;
end;
finally
OutputDebugStr( 'Test completed' );
end;
end;
begin
try
Test;
except
on E: Exception do
Writeln( E.ClassName, ': ', E.Message );
end;
ReadLn;
end.
并且调试日志是
Debug-Ausgabe: Test started Prozess WatchStealingTasks.exe (4532)
Thread-Start: Thread-ID: 2104. Prozess WatchStealingTasks.exe (4532)
Thread-Start: Thread-ID: 2188. Prozess WatchStealingTasks.exe (4532)
Thread-Start: Thread-ID: 4948. Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2188] waiting for tasks completion Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2104] waiting for tasks completion Prozess WatchStealingTasks.exe (4532)
Thread-Start: Thread-ID: 2212. Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [4948] waiting for tasks completion Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2188] task finished Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [4948] task finished Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2104] executed task Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2188] executed stolen task from [2104] Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [4948] executed stolen task from [2104] Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: [2104] task finished Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: Thread Exiting: 2188 Prozess WatchStealingTasks.exe (4532)
Debug-Ausgabe: Thread Exiting: 4948 Prozess WatchStealingTasks.exe (4532)
Thread-Ende: Thread-ID: 4948. Prozess WatchStealingTasks.exe (4532)
Thread-Ende: Thread-ID: 2188. Prozess WatchStealingTasks.exe (4532)
Thread-Ende: Thread-ID: 2212. Prozess WatchStealingTasks.exe (4532)
好的,窃取现在应该可以使用任意数量的工作线程,所以一切都好吗?
否
这个小测试应用不会结束,因为现在它冻结在线程池的析构函数中。由于
最后一个工作线程不会终止
procedure TThreadPool.TQueueWorkerThread.Execute;
...
if ThreadPool.FWorkerThreadCount = 1 then
begin
// it is the last thread after all tasks executed, but
// FQueuedRequestCount is still on 7 - WTF
if ThreadPool.FQueuedRequestCount = 0 then
begin
这里还有一个要修复的错误...因为当使用 Task.WaitForAll
等待任务时,您现在等待的所有任务都在内部执行,但不会减少 FQueuedRequestCount
。
解决这个问题
function TThreadPool.TryRemoveWorkItem(const WorkerData: IThreadPoolWorkItem): Boolean;
begin
Result := (QueueThread <> nil) and (QueueThread.WorkQueue <> nil);
if Result then
Result := QueueThread.WorkQueue.LocalFindAndRemove(WorkerData);
{$IFDEF USE_BUGFIX}
if Result then
DecWorkRequestCount;
{$ENDIF}
end;
现在它的运行就像它应该立即完成的那样。
更新
作为 Uwe 的评论,我们还需要修复固定的 System.Generics.Collections.TArray.Copy<T>
class procedure TArray.Copy<T>(const Source, Destination: array of T; SourceIndex, DestIndex, Count: NativeInt);
{$IFDEF USE_BUGFIX}
begin
CheckArrays(Pointer(@Source[0]), Pointer(@Destination[0]), SourceIndex, Length(Source), DestIndex, Length(Destination), Count);
if IsManagedType(T) then
System.CopyArray(Pointer(@Destination[DestIndex]), Pointer(@Source[SourceIndex]), TypeInfo(T), Count)
else
System.Move(Pointer(@Source[SourceIndex])^,Pointer(@Destination[DestIndex])^, Count * SizeOf(T) );
end;
{$ELSE}
begin
CheckArrays(Pointer(@Source[0]), Pointer(@Destination[0]), SourceIndex, Length(Source), DestIndex, Length(Destination), Count);
if IsManagedType(T) then
System.CopyArray(Pointer(@Destination[SourceIndex]), Pointer(@Source[SourceIndex]), TypeInfo(T), Count)
else
System.Move(Pointer(@Destination[SourceIndex])^, Pointer(@Source[SourceIndex])^, Count * SizeOf(T));
end;
{$ENDIF}
要测试的简单检查:
procedure TestArrayCopy;
var
LArr1, LArr2: TArray<Integer>;
begin
LArr1 := TArray<Integer>.Create( 10, 11, 12, 13 );
LArr2 := TArray<Integer>.Create( 20, 21 );
// copy the last 2 elements from LArr1 to LArr2
TArray.Copy<Integer>( LArr1, LArr2, 2, 0, 2 );
end;
- 在 XE7 中你会得到一个例外
- 使用 XE7 Update1,您将获得
LArr1 = ( 10, 11, 0, 0 )
LArr2 = ( 20, 21 )
- 通过上面的修复将得到
LArr1 = ( 10, 11, 12, 13 )
LArr2 = ( 12, 13 )
由于 System.Generics.Collections.TArray.Copy<T>
(取决于 already reported bug in System.CopyArray
)的未修复错误,有时会在使用线程库时引发异常。
异常在方法 System.Threading.TSparseArray<T>.Add
:
function TSparseArray<T>.Add(const Item: T): Integer;
var
I: Integer;
LArray, NewArray: TArray<T>;
begin
...
TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here
...
end;
好吧,System.CopyArray
中的错误是预料之中的。所以当试图解决这个问题时,我的第一个想法是简单地复制数组:
// TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here
for LIdx := Low( LArray ) to High( LArray ) do
NewArray[LIdx] := LArray[LIdx];
很有魅力。但是在那之后我想知道为什么需要数组副本:
LArray := FArray; // copy array reference from field
...
SetLength(NewArray, Length(LArray) * 2);
TArray.Copy<T>(LArray, NewArray, I + 1);
NewArray[I + 1] := Item;
Exit(I + 1);
元素被复制到NewArray
(局部变量),仅此而已。没有返回到 FArray
的赋值,所以对我来说 NewArray
将在超出范围时完成。
现在我有三个错误修正选择:
只需替换
TArray.Copy
SetLength(NewArray, Length(LArray) * 2); // TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here for LIdx := Low( LArray ) to High( LArray ) do NewArray[LIdx] := LArray[LIdx]; NewArray[I + 1] := Item; Exit(I + 1);
替换
TArray.Copy
并保存NewArray
SetLength(NewArray, Length(LArray) * 2); // TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here for LIdx := Low( LArray ) to High( LArray ) do NewArray[LIdx] := LArray[LIdx]; NewArray[I + 1] := Item; FArray := NewArray; Exit(I + 1);
注释掉所有不必要的代码部分(因为它们只是在浪费时间)
// SetLength(NewArray, Length(LArray) * 2); // TArray.Copy<T>(LArray, NewArray, I + 1); // <- Exception here // NewArray[I + 1] := Item; Exit(I + 1);
我用一堆任务检查了所有三个修复程序,寻找未使用的工作线程或未执行的任务。但我没有找到他们中的任何一个。该库按预期工作(现在没有任何异常)。
你能指出我在这里遗漏了什么吗?
为了得到这个异常,您运行了一堆任务并让 TTaskPool
创建了越来越多的 TWorkerQueueThreads
。通过 TaskManager 检查线程数并在 TSparseArray<T>.Add
方法中的 TArray.Copy
行使用断点。当应用程序的线程数超过 25 个线程时,我在这里得到这个异常。
// Hit the button very fast until the debugger stops
// at TSparseArray<T>.Add method to copy the array
procedure TForm1.Button1Click( Sender : TObject );
var
LIdx : Integer;
begin
for LIdx := 1 to 20 do
TTask.Run(
procedure
begin
Sleep( 50 );
end );
end;
这不是 System.CopyArray
中的错误。按照设计,它只支持托管类型。该错误实际上在 TArray.Copy<T>
中。在不区分 T
是否为托管类型的情况下调用 System.CopyArray
是错误的。
但是,来自 XE7 update 1 的最新版本 TArray.Copy<T>
似乎没有遇到您描述的问题。代码如下所示:
class procedure TArray.Copy<T>(const Source, Destination: array of T;
SourceIndex, DestIndex, Count: NativeInt);
begin
CheckArrays(Pointer(@Source[0]), Pointer(@Destination[0]), SourceIndex,
Length(Source), DestIndex, Length(Destination), Count);
if IsManagedType(T) then
System.CopyArray(Pointer(@Destination[SourceIndex]),
Pointer(@Source[SourceIndex]), TypeInfo(T), Count)
else
System.Move(Pointer(@Destination[SourceIndex])^, Pointer(@Source[SourceIndex])^,
Count * SizeOf(T));
end;
除非我的分析有误,否则您只需应用更新 1 即可解决 System.CopyArray
的问题。
但正如 Uwe 在下面的评论中指出的那样,这段代码仍然是伪造的。它在应该使用 DestIndex
的地方错误地使用了 SourceIndex
。源参数和目标参数的传递顺序错误。人们还想知道为什么作者写 Pointer(@Destination[SourceIndex])^
而不是 Destination[SourceIndex]
。我发现整个情况非常令人沮丧。 Embarcadero 怎么能发布质量如此骇人听闻的代码?
比上述更深层次的问题是 TSparseArray<T>
。看起来像这样:
function TSparseArray<T>.Add(const Item: T): Integer;
var
I: Integer;
LArray, NewArray: TArray<T>;
begin
while True do
begin
LArray := FArray;
TMonitor.Enter(FLock);
try
for I := 0 to Length(LArray) - 1 do
begin
if LArray[I] = nil then
begin
FArray[I] := Item;
Exit(I);
end else if I = Length(LArray) - 1 then
begin
if LArray <> FArray then
Continue;
SetLength(NewArray, Length(LArray) * 2);
TArray.Copy<T>(LArray, NewArray, I + 1);
NewArray[I + 1] := Item;
Exit(I + 1);
end;
end;
finally
TMonitor.Exit(FLock);
end;
end;
end;
唯一一次初始化 FArray
是在 TSparseArray<T>
构造函数中。这意味着如果数组变满,则会添加和丢失项目。据推测 I = Length(LArray) - 1
是为了扩展 FArray
的长度并捕获新项目。但是,还要注意 TSparseArray<T>
通过 Current
属性 暴露了 FArray
。而且这种暴露是不受锁保护的。所以,一旦 FArray
变满,我就看不出这个 class 有什么用处。
我建议您构建一个示例,其中 FArray
变满,以证明添加的项目丢失了。提交一个错误报告来证明这一点,并链接到这个问题。
项目是否写入TSparseArray<T>
并不重要,因为只有当一个工作线程已经完成了所有委托给他的任务而另一个工作线程还没有完成时才需要它。此时空闲线程正在查看池中其他线程的队列并尝试窃取一些工作。
如果任何队列未进入此数组,则空闲线程不可见,因此无法分担工作负载。
为了解决这个问题,我选择了选项 2
function TSparseArray<T>.Add(const Item: T): Integer;
...
SetLength(NewArray, Length(LArray) * 2);
TArray.Copy<T>(LArray, NewArray, I + 1); // <- No Exception here with XE7U1
NewArray[I + 1] := Item;
{$IFDEF USE_BUGFIX}
FArray := NewArray;
{$ENDIF}
Exit(I + 1);
但是在没有任何锁定的情况下实施窃取部分是有风险的
procedure TThreadPool.TQueueWorkerThread.Execute;
...
if Signaled then
begin
I := 0;
while I < Length(ThreadPool.FQueues.Current) do
begin
if (ThreadPool.FQueues.Current[I] <> nil)
and (ThreadPool.FQueues.Current[I] <> WorkQueue)
and ThreadPool.FQueues.Current[I].TrySteal(Item)
then
Break;
Inc(I);
end;
if I <> Length(ThreadPool.FQueues.Current) then
Break;
LookedForSteals := True;
end
数组长度只会增加
while I < Length(ThreadPool.FQueues.Current) do
和
if I <> Length(ThreadPool.FQueues.Current) then
应该够安全了。
if Signaled then
begin
I := 0;
while I < Length(ThreadPool.FQueues.Current) do
begin
{$IFDEF USE_BUGFIX}
TMonitor.Enter(ThreadPool.FQueues);
try
{$ENDIF}
if (ThreadPool.FQueues.Current[I] <> nil) and (ThreadPool.FQueues.Current[I] <> WorkQueue) and ThreadPool.FQueues.Current[I].TrySteal(Item) then
Break;
{$IFDEF USE_BUGFIX}
finally
TMonitor.Exit(ThreadPool.FQueues);
end;
{$ENDIF}
Inc(I);
end;
if I <> Length(ThreadPool.FQueues.Current) then
Break;
LookedForSteals := True;
end
现在我们需要一个测试环境来观看偷窃:
program WatchStealingTasks;
{$APPTYPE CONSOLE}
{$R *.res}
uses
Winapi.Windows,
System.SysUtils,
System.Threading,
System.Classes,
System.Math;
procedure OutputDebugStr( const AStr: string ); overload;
begin
OutputDebugString( PChar( AStr ) );
end;
procedure OutputDebugStr( const AFormat: string; const AParams: array of const ); overload;
begin
OutputDebugStr( Format( AFormat, AParams ) );
end;
function CreateInnerTask( AThreadId: Cardinal; AValue: Integer; APool: TThreadPool ): ITask;
begin
Result := TTask.Run(
procedure
begin
Sleep( AValue );
if AThreadId <> TThread.CurrentThread.ThreadID
then
OutputDebugStr( '[%d] executed stolen task from [%d]', [TThread.CurrentThread.ThreadID, AThreadId] )
else
OutputDebugStr( '[%d] executed task', [TThread.CurrentThread.ThreadID] );
end, APool );
end;
function CreateTask( AValue: Integer; APool: TThreadPool ): ITask;
begin
Result := TTask.Run(
procedure
var
LIdx: Integer;
LTasks: TArray<ITask>;
begin
// Create three inner tasks per task
SetLength( LTasks, 3 );
for LIdx := Low( LTasks ) to High( LTasks ) do
begin
LTasks[LIdx] := CreateInnerTask( TThread.CurrentThread.ThreadID, AValue, APool );
end;
OutputDebugStr( '[%d] waiting for tasks completion', [TThread.CurrentThread.ThreadID] );
TTask.WaitForAll( LTasks );
OutputDebugStr( '[%d] task finished', [TThread.CurrentThread.ThreadID] );
end, APool );
end;
procedure Test;
var
LPool: TThreadPool;
LIdx: Integer;
LTasks: TArray<ITask>;
begin
OutputDebugStr( 'Test started' );
try
LPool := TThreadPool.Create;
try
// Create three tasks
SetLength( LTasks, 3 );
for LIdx := Low( LTasks ) to High( LTasks ) do
begin
// Let's put some heavy work (200ms) on the first tasks shoulder
// and the other tasks just some light work (20ms) to do
LTasks[LIdx] := CreateTask( IfThen( LIdx = 0, 200, 20 ), LPool );
end;
TTask.WaitForAll( LTasks );
finally
LPool.Free;
end;
finally
OutputDebugStr( 'Test completed' );
end;
end;
begin
try
Test;
except
on E: Exception do
Writeln( E.ClassName, ': ', E.Message );
end;
ReadLn;
end.
并且调试日志是
Debug-Ausgabe: Test started Prozess WatchStealingTasks.exe (4532) Thread-Start: Thread-ID: 2104. Prozess WatchStealingTasks.exe (4532) Thread-Start: Thread-ID: 2188. Prozess WatchStealingTasks.exe (4532) Thread-Start: Thread-ID: 4948. Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [2188] waiting for tasks completion Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [2104] waiting for tasks completion Prozess WatchStealingTasks.exe (4532) Thread-Start: Thread-ID: 2212. Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [4948] waiting for tasks completion Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [2188] executed task Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [2188] task finished Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [4948] executed task Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [4948] task finished Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [2104] executed task Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [2188] executed stolen task from [2104] Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [4948] executed stolen task from [2104] Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: [2104] task finished Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: Thread Exiting: 2188 Prozess WatchStealingTasks.exe (4532) Debug-Ausgabe: Thread Exiting: 4948 Prozess WatchStealingTasks.exe (4532) Thread-Ende: Thread-ID: 4948. Prozess WatchStealingTasks.exe (4532) Thread-Ende: Thread-ID: 2188. Prozess WatchStealingTasks.exe (4532) Thread-Ende: Thread-ID: 2212. Prozess WatchStealingTasks.exe (4532)
好的,窃取现在应该可以使用任意数量的工作线程,所以一切都好吗?
否
这个小测试应用不会结束,因为现在它冻结在线程池的析构函数中。由于
最后一个工作线程不会终止procedure TThreadPool.TQueueWorkerThread.Execute;
...
if ThreadPool.FWorkerThreadCount = 1 then
begin
// it is the last thread after all tasks executed, but
// FQueuedRequestCount is still on 7 - WTF
if ThreadPool.FQueuedRequestCount = 0 then
begin
这里还有一个要修复的错误...因为当使用 Task.WaitForAll
等待任务时,您现在等待的所有任务都在内部执行,但不会减少 FQueuedRequestCount
。
解决这个问题
function TThreadPool.TryRemoveWorkItem(const WorkerData: IThreadPoolWorkItem): Boolean;
begin
Result := (QueueThread <> nil) and (QueueThread.WorkQueue <> nil);
if Result then
Result := QueueThread.WorkQueue.LocalFindAndRemove(WorkerData);
{$IFDEF USE_BUGFIX}
if Result then
DecWorkRequestCount;
{$ENDIF}
end;
现在它的运行就像它应该立即完成的那样。
更新
作为 Uwe 的评论,我们还需要修复固定的 System.Generics.Collections.TArray.Copy<T>
class procedure TArray.Copy<T>(const Source, Destination: array of T; SourceIndex, DestIndex, Count: NativeInt);
{$IFDEF USE_BUGFIX}
begin
CheckArrays(Pointer(@Source[0]), Pointer(@Destination[0]), SourceIndex, Length(Source), DestIndex, Length(Destination), Count);
if IsManagedType(T) then
System.CopyArray(Pointer(@Destination[DestIndex]), Pointer(@Source[SourceIndex]), TypeInfo(T), Count)
else
System.Move(Pointer(@Source[SourceIndex])^,Pointer(@Destination[DestIndex])^, Count * SizeOf(T) );
end;
{$ELSE}
begin
CheckArrays(Pointer(@Source[0]), Pointer(@Destination[0]), SourceIndex, Length(Source), DestIndex, Length(Destination), Count);
if IsManagedType(T) then
System.CopyArray(Pointer(@Destination[SourceIndex]), Pointer(@Source[SourceIndex]), TypeInfo(T), Count)
else
System.Move(Pointer(@Destination[SourceIndex])^, Pointer(@Source[SourceIndex])^, Count * SizeOf(T));
end;
{$ENDIF}
要测试的简单检查:
procedure TestArrayCopy;
var
LArr1, LArr2: TArray<Integer>;
begin
LArr1 := TArray<Integer>.Create( 10, 11, 12, 13 );
LArr2 := TArray<Integer>.Create( 20, 21 );
// copy the last 2 elements from LArr1 to LArr2
TArray.Copy<Integer>( LArr1, LArr2, 2, 0, 2 );
end;
- 在 XE7 中你会得到一个例外
- 使用 XE7 Update1,您将获得
LArr1 = ( 10, 11, 0, 0 ) LArr2 = ( 20, 21 )
- 通过上面的修复将得到
LArr1 = ( 10, 11, 12, 13 ) LArr2 = ( 12, 13 )