如何检查线程当前是否 运行
How to check if a thread is currently running
我正在设计一个具有以下特点的线程池。
- 仅当所有其他线程都处于 运行.
时才应生成新线程
- 最大线程数应该是可配置的。
- 当一个线程正在等待时,它应该能够处理新的请求。
- 每个 IO 操作都应在完成时调用回调
- 线程应该有办法管理请求的服务和 IO 回调
代码如下:
unit ThreadUtilities;
interface
uses
Windows, SysUtils, Classes;
type
EThreadStackFinalized = class(Exception);
TSimpleThread = class;
// Thread Safe Pointer Queue
TThreadQueue = class
private
FFinalized: Boolean;
FIOQueue: THandle;
public
constructor Create;
destructor Destroy; override;
procedure Finalize;
procedure Push(Data: Pointer);
function Pop(var Data: Pointer): Boolean;
property Finalized: Boolean read FFinalized;
end;
TThreadExecuteEvent = procedure (Thread: TThread) of object;
TSimpleThread = class(TThread)
private
FExecuteEvent: TThreadExecuteEvent;
protected
procedure Execute(); override;
public
constructor Create(CreateSuspended: Boolean; ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean);
end;
TThreadPoolEvent = procedure (Data: Pointer; AThread: TThread) of Object;
TThreadPool = class(TObject)
private
FThreads: TList;
fis32MaxThreadCount : Integer;
FThreadQueue: TThreadQueue;
FHandlePoolEvent: TThreadPoolEvent;
procedure DoHandleThreadExecute(Thread: TThread);
procedure SetMaxThreadCount(const pis32MaxThreadCount : Integer);
function GetMaxThreadCount : Integer;
public
constructor Create( HandlePoolEvent: TThreadPoolEvent; MaxThreads: Integer = 1); virtual;
destructor Destroy; override;
procedure Add(const Data: Pointer);
property MaxThreadCount : Integer read GetMaxThreadCount write SetMaxThreadCount;
end;
implementation
constructor TThreadQueue.Create;
begin
//-- Create IO Completion Queue
FIOQueue := CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
FFinalized := False;
end;
destructor TThreadQueue.Destroy;
begin
//-- Destroy Completion Queue
if (FIOQueue = 0) then
CloseHandle(FIOQueue);
inherited;
end;
procedure TThreadQueue.Finalize;
begin
//-- Post a finialize pointer on to the queue
PostQueuedCompletionStatus(FIOQueue, 0, 0, Pointer($FFFFFFFF));
FFinalized := True;
end;
function TThreadQueue.Pop(var Data: Pointer): Boolean;
var
A: Cardinal;
OL: POverLapped;
begin
Result := True;
if (not FFinalized) then
//-- Remove/Pop the first pointer from the queue or wait
GetQueuedCompletionStatus(FIOQueue, A, Cardinal(Data), OL, INFINITE);
//-- Check if we have finalized the queue for completion
if FFinalized or (OL = Pointer($FFFFFFFF)) then begin
Data := nil;
Result := False;
Finalize;
end;
end;
procedure TThreadQueue.Push(Data: Pointer);
begin
if FFinalized then
Raise EThreadStackFinalized.Create('Stack is finalized');
//-- Add/Push a pointer on to the end of the queue
PostQueuedCompletionStatus(FIOQueue, 0, Cardinal(Data), nil);
end;
{ TSimpleThread }
constructor TSimpleThread.Create(CreateSuspended: Boolean;
ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean);
begin
FreeOnTerminate := AFreeOnTerminate;
FExecuteEvent := ExecuteEvent;
inherited Create(CreateSuspended);
end;
按照 J 的建议更改了代码...还添加了关键部分,但我现在面临的问题是,当我尝试调用多个任务时,只使用了一个线程,假设我在其中添加了 5 个线程池然后只使用一个线程,即线程 1。请在下面的部分中检查我的客户端代码。
procedure TSimpleThread.Execute;
begin
// if Assigned(FExecuteEvent) then
// FExecuteEvent(Self);
while not self.Terminated do begin
try
// FGoEvent.WaitFor(INFINITE);
// FGoEvent.ResetEvent;
EnterCriticalSection(csCriticalSection);
if self.Terminated then break;
if Assigned(FExecuteEvent) then
FExecuteEvent(Self);
finally
LeaveCriticalSection(csCriticalSection);
// HandleException;
end;
end;
end;
在Add
方法中,如何检查是否有任何线程不忙,如果不忙则重用它,否则创建一个新线程并将其添加到ThreadPool列表中?
{ TThreadPool }
procedure TThreadPool.Add(const Data: Pointer);
begin
FThreadQueue.Push(Data);
// if FThreads.Count < MaxThreadCount then
// begin
// FThreads.Add(TSimpleThread.Create(False, DoHandleThreadExecute, False));
// end;
end;
constructor TThreadPool.Create(HandlePoolEvent: TThreadPoolEvent;
MaxThreads: Integer);
begin
FHandlePoolEvent := HandlePoolEvent;
FThreadQueue := TThreadQueue.Create;
FThreads := TList.Create;
FThreads.Add(TSimpleThread.Create(False, DoHandleThreadExecute, False));
end;
destructor TThreadPool.Destroy;
var
t: Integer;
begin
FThreadQueue.Finalize;
for t := 0 to FThreads.Count-1 do
TThread(FThreads[t]).Terminate;
while (FThreads.Count = 0) do begin
TThread(FThreads[0]).WaitFor;
TThread(FThreads[0]).Free;
FThreads.Delete(0);
end;
FThreadQueue.Free;
FThreads.Free;
inherited;
end;
procedure TThreadPool.DoHandleThreadExecute(Thread: TThread);
var
Data: Pointer;
begin
while FThreadQueue.Pop(Data) and (not TSimpleThread(Thread).Terminated) do begin
try
FHandlePoolEvent(Data, Thread);
except
end;
end;
end;
function TThreadPool.GetMaxThreadCount: Integer;
begin
Result := fis32MaxThreadCount;
end;
procedure TThreadPool.SetMaxThreadCount(const pis32MaxThreadCount: Integer);
begin
fis32MaxThreadCount := pis32MaxThreadCount;
end;
end.
客户代码:
这是我创建的用于将数据记录在文本文件中的客户端:
单元线程客户端;
interface
uses Windows, SysUtils, Classes, ThreadUtilities;
type
PLogRequest = ^TLogRequest;
TLogRequest = record
LogText: String;
end;
TThreadFileLog = class(TObject)
private
FFileName: String;
FThreadPool: TThreadPool;
procedure HandleLogRequest(Data: Pointer; AThread: TThread);
public
constructor Create(const FileName: string);
destructor Destroy; override;
procedure Log(const LogText: string);
procedure SetMaxThreadCount(const pis32MaxThreadCnt : Integer);
end;
implementation
(* Simple reuse of a logtofile function for example *)
procedure LogToFile(const FileName, LogString: String);
var
F: TextFile;
begin
AssignFile(F, FileName);
if not FileExists(FileName) then
Rewrite(F)
else
Append(F);
try
Writeln(F, DateTimeToStr(Now) + ': ' + LogString);
finally
CloseFile(F);
end;
end;
constructor TThreadFileLog.Create(const FileName: string);
begin
FFileName := FileName;
//-- Pool of one thread to handle queue of logs
FThreadPool := TThreadPool.Create(HandleLogRequest, 5);
end;
destructor TThreadFileLog.Destroy;
begin
FThreadPool.Free;
inherited;
end;
procedure TThreadFileLog.HandleLogRequest(Data: Pointer; AThread: TThread);
var
Request: PLogRequest;
los32Idx : Integer;
begin
Request := Data;
try
for los32Idx := 0 to 100 do
begin
LogToFile(FFileName, IntToStr( AThread.ThreadID) + Request^.LogText);
end;
finally
Dispose(Request);
end;
end;
procedure TThreadFileLog.Log(const LogText: string);
var
Request: PLogRequest;
begin
New(Request);
Request^.LogText := LogText;
FThreadPool.Add(Request);
end;
procedure TThreadFileLog.SetMaxThreadCount(const pis32MaxThreadCnt: Integer);
begin
FThreadPool.MaxThreadCount := pis32MaxThreadCnt;
end;
end.
这是我添加了三个按钮的表单应用程序,每次单击按钮都会将一些值写入带有线程 ID 和文本消息的文件。但问题是线程 ID 始终相同
unit ThreadPool;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, ThreadClient;
type
TForm5 = class(TForm)
Button1: TButton;
Button2: TButton;
Button3: TButton;
Edit1: TEdit;
procedure Button1Click(Sender: TObject);
procedure FormCreate(Sender: TObject);
procedure Button2Click(Sender: TObject);
procedure Button3Click(Sender: TObject);
procedure Edit1Change(Sender: TObject);
private
{ Private declarations }
fiFileLog : TThreadFileLog;
public
{ Public declarations }
end;
var
Form5: TForm5;
implementation
{$R *.dfm}
procedure TForm5.Button1Click(Sender: TObject);
begin
fiFileLog.Log('Button one click');
end;
procedure TForm5.Button2Click(Sender: TObject);
begin
fiFileLog.Log('Button two click');
end;
procedure TForm5.Button3Click(Sender: TObject);
begin
fiFileLog.Log('Button three click');
end;
procedure TForm5.Edit1Change(Sender: TObject);
begin
fiFileLog.SetMaxThreadCount(StrToInt(Edit1.Text));
end;
procedure TForm5.FormCreate(Sender: TObject);
begin
fiFileLog := TThreadFileLog.Create('C:/test123.txt');
end;
end.
首先,可能也是最强烈建议的,您可以考虑使用像 OmniThread 这样的库来实现线程池。艰苦的工作已经为您完成,您可能最终会使用自己的解决方案制作出不合格且有缺陷的产品。除非您有特殊要求,否则这可能是最快和最简单的解决方案。
就是说,如果您想尝试这样做...
您可能会考虑在启动时而不是按需创建池中的所有线程。如果服务器在任何时候都很忙,那么它最终会很快得到一个 MaxThreadCount
的池。
无论如何,如果您想让线程池保持活动状态并可用于工作,那么它们需要遵循与您编写的模型略有不同的模型。
考虑:
procedure TSimpleThread.Execute;
begin
if Assigned(FExecuteEvent) then
FExecuteEvent(Self);
end;
在这里,当您 运行 您的线程时,它将执行此回调,然后终止。这似乎不是你想要的。您似乎想要的是让线程保持活动状态,但等待它的下一个工作包。我使用一个基本线程class(用于池)和一个看起来像这样的执行方法(这有点简化):
procedure TMyCustomThread.Execute;
begin
while not self.Terminated do begin
try
FGoEvent.WaitFor(INFINITE);
FGoEvent.ResetEvent;
if self.Terminated then break;
MainExecute;
except
HandleException;
end;
end;
end;
这里FGoEvent
是一个TEvent
。实现 class 定义了工作包在抽象 MainExecute
方法中的样子,但无论它是什么,线程都会执行它的工作,然后 return 等待 FGoEvent
表示它有新的工作要做。
在您的情况下,您需要跟踪哪些线程正在等待,哪些正在工作。您可能需要某种管理器 class 来跟踪这些线程对象。为每个分配一些像 threadID 这样简单的东西似乎是明智的。对于每个线程,就在启动它之前,记录下它当前正忙。在工作包的最后,您可以 post 向管理器 class 返回一条消息,告诉它工作已完成(并且它可以将线程标记为可用于工作)。
当您将工作添加到队列中时,您可以先检查 运行 工作的可用线程(如果您希望遵循您概述的模型,则可以创建一个新线程)。如果有线程则启动任务,如果没有则将工作推入工作队列。当工作线程报告完成时,管理器可以检查队列中是否有未完成的工作。如果有工作,它可以立即重新部署线程。如果没有工作,它可以将线程标记为可工作(在这里您可以为可用工作人员使用第二个队列)。
完整的实现太复杂了,无法在此处用一个答案记录 - 这只是为了粗略地提出一些一般性想法。
我正在设计一个具有以下特点的线程池。
- 仅当所有其他线程都处于 运行. 时才应生成新线程
- 最大线程数应该是可配置的。
- 当一个线程正在等待时,它应该能够处理新的请求。
- 每个 IO 操作都应在完成时调用回调
- 线程应该有办法管理请求的服务和 IO 回调
代码如下:
unit ThreadUtilities;
interface
uses
Windows, SysUtils, Classes;
type
EThreadStackFinalized = class(Exception);
TSimpleThread = class;
// Thread Safe Pointer Queue
TThreadQueue = class
private
FFinalized: Boolean;
FIOQueue: THandle;
public
constructor Create;
destructor Destroy; override;
procedure Finalize;
procedure Push(Data: Pointer);
function Pop(var Data: Pointer): Boolean;
property Finalized: Boolean read FFinalized;
end;
TThreadExecuteEvent = procedure (Thread: TThread) of object;
TSimpleThread = class(TThread)
private
FExecuteEvent: TThreadExecuteEvent;
protected
procedure Execute(); override;
public
constructor Create(CreateSuspended: Boolean; ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean);
end;
TThreadPoolEvent = procedure (Data: Pointer; AThread: TThread) of Object;
TThreadPool = class(TObject)
private
FThreads: TList;
fis32MaxThreadCount : Integer;
FThreadQueue: TThreadQueue;
FHandlePoolEvent: TThreadPoolEvent;
procedure DoHandleThreadExecute(Thread: TThread);
procedure SetMaxThreadCount(const pis32MaxThreadCount : Integer);
function GetMaxThreadCount : Integer;
public
constructor Create( HandlePoolEvent: TThreadPoolEvent; MaxThreads: Integer = 1); virtual;
destructor Destroy; override;
procedure Add(const Data: Pointer);
property MaxThreadCount : Integer read GetMaxThreadCount write SetMaxThreadCount;
end;
implementation
constructor TThreadQueue.Create;
begin
//-- Create IO Completion Queue
FIOQueue := CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
FFinalized := False;
end;
destructor TThreadQueue.Destroy;
begin
//-- Destroy Completion Queue
if (FIOQueue = 0) then
CloseHandle(FIOQueue);
inherited;
end;
procedure TThreadQueue.Finalize;
begin
//-- Post a finialize pointer on to the queue
PostQueuedCompletionStatus(FIOQueue, 0, 0, Pointer($FFFFFFFF));
FFinalized := True;
end;
function TThreadQueue.Pop(var Data: Pointer): Boolean;
var
A: Cardinal;
OL: POverLapped;
begin
Result := True;
if (not FFinalized) then
//-- Remove/Pop the first pointer from the queue or wait
GetQueuedCompletionStatus(FIOQueue, A, Cardinal(Data), OL, INFINITE);
//-- Check if we have finalized the queue for completion
if FFinalized or (OL = Pointer($FFFFFFFF)) then begin
Data := nil;
Result := False;
Finalize;
end;
end;
procedure TThreadQueue.Push(Data: Pointer);
begin
if FFinalized then
Raise EThreadStackFinalized.Create('Stack is finalized');
//-- Add/Push a pointer on to the end of the queue
PostQueuedCompletionStatus(FIOQueue, 0, Cardinal(Data), nil);
end;
{ TSimpleThread }
constructor TSimpleThread.Create(CreateSuspended: Boolean;
ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean);
begin
FreeOnTerminate := AFreeOnTerminate;
FExecuteEvent := ExecuteEvent;
inherited Create(CreateSuspended);
end;
按照 J 的建议更改了代码...还添加了关键部分,但我现在面临的问题是,当我尝试调用多个任务时,只使用了一个线程,假设我在其中添加了 5 个线程池然后只使用一个线程,即线程 1。请在下面的部分中检查我的客户端代码。
procedure TSimpleThread.Execute;
begin
// if Assigned(FExecuteEvent) then
// FExecuteEvent(Self);
while not self.Terminated do begin
try
// FGoEvent.WaitFor(INFINITE);
// FGoEvent.ResetEvent;
EnterCriticalSection(csCriticalSection);
if self.Terminated then break;
if Assigned(FExecuteEvent) then
FExecuteEvent(Self);
finally
LeaveCriticalSection(csCriticalSection);
// HandleException;
end;
end;
end;
在Add
方法中,如何检查是否有任何线程不忙,如果不忙则重用它,否则创建一个新线程并将其添加到ThreadPool列表中?
{ TThreadPool }
procedure TThreadPool.Add(const Data: Pointer);
begin
FThreadQueue.Push(Data);
// if FThreads.Count < MaxThreadCount then
// begin
// FThreads.Add(TSimpleThread.Create(False, DoHandleThreadExecute, False));
// end;
end;
constructor TThreadPool.Create(HandlePoolEvent: TThreadPoolEvent;
MaxThreads: Integer);
begin
FHandlePoolEvent := HandlePoolEvent;
FThreadQueue := TThreadQueue.Create;
FThreads := TList.Create;
FThreads.Add(TSimpleThread.Create(False, DoHandleThreadExecute, False));
end;
destructor TThreadPool.Destroy;
var
t: Integer;
begin
FThreadQueue.Finalize;
for t := 0 to FThreads.Count-1 do
TThread(FThreads[t]).Terminate;
while (FThreads.Count = 0) do begin
TThread(FThreads[0]).WaitFor;
TThread(FThreads[0]).Free;
FThreads.Delete(0);
end;
FThreadQueue.Free;
FThreads.Free;
inherited;
end;
procedure TThreadPool.DoHandleThreadExecute(Thread: TThread);
var
Data: Pointer;
begin
while FThreadQueue.Pop(Data) and (not TSimpleThread(Thread).Terminated) do begin
try
FHandlePoolEvent(Data, Thread);
except
end;
end;
end;
function TThreadPool.GetMaxThreadCount: Integer;
begin
Result := fis32MaxThreadCount;
end;
procedure TThreadPool.SetMaxThreadCount(const pis32MaxThreadCount: Integer);
begin
fis32MaxThreadCount := pis32MaxThreadCount;
end;
end.
客户代码: 这是我创建的用于将数据记录在文本文件中的客户端: 单元线程客户端;
interface
uses Windows, SysUtils, Classes, ThreadUtilities;
type
PLogRequest = ^TLogRequest;
TLogRequest = record
LogText: String;
end;
TThreadFileLog = class(TObject)
private
FFileName: String;
FThreadPool: TThreadPool;
procedure HandleLogRequest(Data: Pointer; AThread: TThread);
public
constructor Create(const FileName: string);
destructor Destroy; override;
procedure Log(const LogText: string);
procedure SetMaxThreadCount(const pis32MaxThreadCnt : Integer);
end;
implementation
(* Simple reuse of a logtofile function for example *)
procedure LogToFile(const FileName, LogString: String);
var
F: TextFile;
begin
AssignFile(F, FileName);
if not FileExists(FileName) then
Rewrite(F)
else
Append(F);
try
Writeln(F, DateTimeToStr(Now) + ': ' + LogString);
finally
CloseFile(F);
end;
end;
constructor TThreadFileLog.Create(const FileName: string);
begin
FFileName := FileName;
//-- Pool of one thread to handle queue of logs
FThreadPool := TThreadPool.Create(HandleLogRequest, 5);
end;
destructor TThreadFileLog.Destroy;
begin
FThreadPool.Free;
inherited;
end;
procedure TThreadFileLog.HandleLogRequest(Data: Pointer; AThread: TThread);
var
Request: PLogRequest;
los32Idx : Integer;
begin
Request := Data;
try
for los32Idx := 0 to 100 do
begin
LogToFile(FFileName, IntToStr( AThread.ThreadID) + Request^.LogText);
end;
finally
Dispose(Request);
end;
end;
procedure TThreadFileLog.Log(const LogText: string);
var
Request: PLogRequest;
begin
New(Request);
Request^.LogText := LogText;
FThreadPool.Add(Request);
end;
procedure TThreadFileLog.SetMaxThreadCount(const pis32MaxThreadCnt: Integer);
begin
FThreadPool.MaxThreadCount := pis32MaxThreadCnt;
end;
end.
这是我添加了三个按钮的表单应用程序,每次单击按钮都会将一些值写入带有线程 ID 和文本消息的文件。但问题是线程 ID 始终相同
unit ThreadPool;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, ThreadClient;
type
TForm5 = class(TForm)
Button1: TButton;
Button2: TButton;
Button3: TButton;
Edit1: TEdit;
procedure Button1Click(Sender: TObject);
procedure FormCreate(Sender: TObject);
procedure Button2Click(Sender: TObject);
procedure Button3Click(Sender: TObject);
procedure Edit1Change(Sender: TObject);
private
{ Private declarations }
fiFileLog : TThreadFileLog;
public
{ Public declarations }
end;
var
Form5: TForm5;
implementation
{$R *.dfm}
procedure TForm5.Button1Click(Sender: TObject);
begin
fiFileLog.Log('Button one click');
end;
procedure TForm5.Button2Click(Sender: TObject);
begin
fiFileLog.Log('Button two click');
end;
procedure TForm5.Button3Click(Sender: TObject);
begin
fiFileLog.Log('Button three click');
end;
procedure TForm5.Edit1Change(Sender: TObject);
begin
fiFileLog.SetMaxThreadCount(StrToInt(Edit1.Text));
end;
procedure TForm5.FormCreate(Sender: TObject);
begin
fiFileLog := TThreadFileLog.Create('C:/test123.txt');
end;
end.
首先,可能也是最强烈建议的,您可以考虑使用像 OmniThread 这样的库来实现线程池。艰苦的工作已经为您完成,您可能最终会使用自己的解决方案制作出不合格且有缺陷的产品。除非您有特殊要求,否则这可能是最快和最简单的解决方案。
就是说,如果您想尝试这样做...
您可能会考虑在启动时而不是按需创建池中的所有线程。如果服务器在任何时候都很忙,那么它最终会很快得到一个 MaxThreadCount
的池。
无论如何,如果您想让线程池保持活动状态并可用于工作,那么它们需要遵循与您编写的模型略有不同的模型。
考虑:
procedure TSimpleThread.Execute;
begin
if Assigned(FExecuteEvent) then
FExecuteEvent(Self);
end;
在这里,当您 运行 您的线程时,它将执行此回调,然后终止。这似乎不是你想要的。您似乎想要的是让线程保持活动状态,但等待它的下一个工作包。我使用一个基本线程class(用于池)和一个看起来像这样的执行方法(这有点简化):
procedure TMyCustomThread.Execute;
begin
while not self.Terminated do begin
try
FGoEvent.WaitFor(INFINITE);
FGoEvent.ResetEvent;
if self.Terminated then break;
MainExecute;
except
HandleException;
end;
end;
end;
这里FGoEvent
是一个TEvent
。实现 class 定义了工作包在抽象 MainExecute
方法中的样子,但无论它是什么,线程都会执行它的工作,然后 return 等待 FGoEvent
表示它有新的工作要做。
在您的情况下,您需要跟踪哪些线程正在等待,哪些正在工作。您可能需要某种管理器 class 来跟踪这些线程对象。为每个分配一些像 threadID 这样简单的东西似乎是明智的。对于每个线程,就在启动它之前,记录下它当前正忙。在工作包的最后,您可以 post 向管理器 class 返回一条消息,告诉它工作已完成(并且它可以将线程标记为可用于工作)。
当您将工作添加到队列中时,您可以先检查 运行 工作的可用线程(如果您希望遵循您概述的模型,则可以创建一个新线程)。如果有线程则启动任务,如果没有则将工作推入工作队列。当工作线程报告完成时,管理器可以检查队列中是否有未完成的工作。如果有工作,它可以立即重新部署线程。如果没有工作,它可以将线程标记为可工作(在这里您可以为可用工作人员使用第二个队列)。
完整的实现太复杂了,无法在此处用一个答案记录 - 这只是为了粗略地提出一些一般性想法。