线程池、线程安全队列、OOP
Thread pool, Threadsafe Queue, OOP
假设以太网上有一些非 PC 设备,基于给定的 API,我可以使用 UDP 与它们通信。
我想为 UDP 通信使用线程池和线程安全队列。每个工作线程都有自己的 Indy TIdUDPclient 实例。通信就像向设备发送一个 UDP 数据报然后等待应答。答案是在数据段有 >2 个字节的 UDP 数据报。
设备由 TDevice class 个实例表示。
TDevice = class(TObject)
private
...
FStatus: byte;
...
public
...
function GetStatus(): integer; //API commandID = 68 (Getting device Status)
end;
thPool 用于 creating/managing 线程并将作业推送到队列:
TthPool = class(TObject)
private
FQueue: TThreadedQueue<TrUDPdirectJob*>;
FthWorkers: TList<TthWorker>;
public
constructor Create( AthCount, AQueueDepth: integer); //creating the pool here
function SendCommand( ArSendJob: TrSendJob );
end;
function SendCommand(ArSendJob TrSendJob): integer;
begin
...
FQueue.PushItem( ArSendJob );
end;
TDevice 的一个功能是获取它所代表的硬件的状态,并根据接收到的信息设置它的 FStatus 的值答案:
function TDevice.GetStatus(): integer; //command byte: 68
const
PARAMSLENGTH = 4;
var
rSendJob: TrSendJob*
begin
rSendJob.IP := self.FIP;
rSendJob.port := self.Fport;
rSendJob.commandID := 68;
rSendJob.paramslength := PARAMLENGTH ; //need to send the length (API req)
SetLength( rSendJob.params, PARAMSLENGTH );
rSendJob.params[0] := $A; //just some parameters along the commandID
rSendJob.params[1] := $B;
rSendJob.params[2] := $C;
rSendJob.params[3] := $D;
...
thPool.SendCommand( rSendJob ); //pushing the job to the queue
end;
*TrSendJob = record //Job define
ip: string;
port: integer;
commandID: byte;
params: Tparams; //Array of byte
paramslength: byte;
end;
从工作线程发出 UDP 数据报:
procedure TthWorker.Execute;
var
sendBuffer: TIDbytes;
rBuffer: TIdBytes;
rSendJob: TrSendJob;
begin
inherited;
repeat
FQueue.PopItem(rSendJob); //Getting the job from the queue
//Building the sending buffer
sendBuffer[0] := rSendJob. ... ;
...
FIdUDPclient.SendBuffer( rSendJob.IP, rSendJob.port, sendBuffer );
if FIdUDPclient.receiveBuffer(rbuffer, RECTIMEOUT_GLOBAL) >= 1 then
begin
//
end;
until Terminated;
end;
总之,thPool 的 SendCommand 将作业推入队列,工作线程从队列中拉取作业,然后发出 UDP 数据报,然后在 rbuffer 中取回应答字节。但是此时,如何将receivebuffer的内容传回给TDevice实例进行处理呢?这样做的正确(OOP)方法是什么?我需要在 TDevice 方法中进行处理。
如果我在 TrSendJob 定义一个加方法指针字段,那么工作线程知道调用什么方法来处理缓冲区并设置 FState TDevice?
的值
TProcessReceiveBuffer = function(ArBuffer: TIdBytes): integer;
TrSendJob = record //Job define
ip: string;
port: integer;
commandID: byte;
params: Tparams; //Tparams array of byte
paramslength: byte;
*ProcessReceiveBuffer: TProcessReceiveBuffer; //pointer to a TDevice method thats processing the receivebuffer
end;
或者,整个概念是错误的。
But at this point, how to send the contents of receivebuffer back to TDevice instance for processing?
您可以在 TrSendJob
中包含一个 TDevice
对象指针,然后 TthWorker
将知道将缓冲区传递给哪个 TDevice
。
或者,您可以将缓冲区和 TEvent
放入 TrSendJob
,然后让 TthWorker
填充缓冲区并发出事件信号,然后让 TDevice.GetStatus()
等待事件信号并处理缓冲区。
假设以太网上有一些非 PC 设备,基于给定的 API,我可以使用 UDP 与它们通信。
我想为 UDP 通信使用线程池和线程安全队列。每个工作线程都有自己的 Indy TIdUDPclient 实例。通信就像向设备发送一个 UDP 数据报然后等待应答。答案是在数据段有 >2 个字节的 UDP 数据报。
设备由 TDevice class 个实例表示。
TDevice = class(TObject)
private
...
FStatus: byte;
...
public
...
function GetStatus(): integer; //API commandID = 68 (Getting device Status)
end;
thPool 用于 creating/managing 线程并将作业推送到队列:
TthPool = class(TObject)
private
FQueue: TThreadedQueue<TrUDPdirectJob*>;
FthWorkers: TList<TthWorker>;
public
constructor Create( AthCount, AQueueDepth: integer); //creating the pool here
function SendCommand( ArSendJob: TrSendJob );
end;
function SendCommand(ArSendJob TrSendJob): integer;
begin
...
FQueue.PushItem( ArSendJob );
end;
TDevice 的一个功能是获取它所代表的硬件的状态,并根据接收到的信息设置它的 FStatus 的值答案:
function TDevice.GetStatus(): integer; //command byte: 68
const
PARAMSLENGTH = 4;
var
rSendJob: TrSendJob*
begin
rSendJob.IP := self.FIP;
rSendJob.port := self.Fport;
rSendJob.commandID := 68;
rSendJob.paramslength := PARAMLENGTH ; //need to send the length (API req)
SetLength( rSendJob.params, PARAMSLENGTH );
rSendJob.params[0] := $A; //just some parameters along the commandID
rSendJob.params[1] := $B;
rSendJob.params[2] := $C;
rSendJob.params[3] := $D;
...
thPool.SendCommand( rSendJob ); //pushing the job to the queue
end;
*TrSendJob = record //Job define
ip: string;
port: integer;
commandID: byte;
params: Tparams; //Array of byte
paramslength: byte;
end;
从工作线程发出 UDP 数据报:
procedure TthWorker.Execute;
var
sendBuffer: TIDbytes;
rBuffer: TIdBytes;
rSendJob: TrSendJob;
begin
inherited;
repeat
FQueue.PopItem(rSendJob); //Getting the job from the queue
//Building the sending buffer
sendBuffer[0] := rSendJob. ... ;
...
FIdUDPclient.SendBuffer( rSendJob.IP, rSendJob.port, sendBuffer );
if FIdUDPclient.receiveBuffer(rbuffer, RECTIMEOUT_GLOBAL) >= 1 then
begin
//
end;
until Terminated;
end;
总之,thPool 的 SendCommand 将作业推入队列,工作线程从队列中拉取作业,然后发出 UDP 数据报,然后在 rbuffer 中取回应答字节。但是此时,如何将receivebuffer的内容传回给TDevice实例进行处理呢?这样做的正确(OOP)方法是什么?我需要在 TDevice 方法中进行处理。
如果我在 TrSendJob 定义一个加方法指针字段,那么工作线程知道调用什么方法来处理缓冲区并设置 FState TDevice?
的值TProcessReceiveBuffer = function(ArBuffer: TIdBytes): integer;
TrSendJob = record //Job define
ip: string;
port: integer;
commandID: byte;
params: Tparams; //Tparams array of byte
paramslength: byte;
*ProcessReceiveBuffer: TProcessReceiveBuffer; //pointer to a TDevice method thats processing the receivebuffer
end;
或者,整个概念是错误的。
But at this point, how to send the contents of receivebuffer back to TDevice instance for processing?
您可以在 TrSendJob
中包含一个 TDevice
对象指针,然后 TthWorker
将知道将缓冲区传递给哪个 TDevice
。
或者,您可以将缓冲区和 TEvent
放入 TrSendJob
,然后让 TthWorker
填充缓冲区并发出事件信号,然后让 TDevice.GetStatus()
等待事件信号并处理缓冲区。