线程池、线程安全队列、OOP

Thread pool, Threadsafe Queue, OOP

假设以太网上有一些非 PC 设备,基于给定的 API,我可以使用 UDP 与它们通信。

我想为 UDP 通信使用线程池和线程安全队列。每个工作线程都有自己的 Indy TIdUDPclient 实例。通信就像向设备发送一个 UDP 数据报然后等待应答。答案是在数据段有 >2 个字节的 UDP 数据报。

设备由 TDevice class 个实例表示。

TDevice = class(TObject)
private
  ...
  FStatus: byte;
  ...
public
  ...
  function GetStatus(): integer; //API commandID = 68 (Getting device Status)
end;

thPool 用于 creating/managing 线程并将作业推送到队列:

TthPool = class(TObject)
private
  FQueue: TThreadedQueue<TrUDPdirectJob*>;
  FthWorkers: TList<TthWorker>;
public
  constructor Create( AthCount, AQueueDepth: integer); //creating the pool here
  function SendCommand( ArSendJob: TrSendJob );
end;

function SendCommand(ArSendJob TrSendJob): integer; 
begin
  ...
  FQueue.PushItem( ArSendJob );
end;

TDevice 的一个功能是获取它所代表的硬件的状态,并根据接收到的信息设置它的 FStatus 的值答案:

function TDevice.GetStatus(): integer;  //command byte: 68
const
  PARAMSLENGTH = 4;
var
  rSendJob: TrSendJob*        
begin
  rSendJob.IP := self.FIP;
  rSendJob.port := self.Fport;
  rSendJob.commandID := 68;
  rSendJob.paramslength := PARAMLENGTH ; //need to send the length (API req) 
  SetLength( rSendJob.params, PARAMSLENGTH  );  
  rSendJob.params[0] := $A; //just some parameters along the commandID
  rSendJob.params[1] := $B;
  rSendJob.params[2] := $C;
  rSendJob.params[3] := $D;
  ...
  thPool.SendCommand( rSendJob );   //pushing the job to the queue   
end;

*TrSendJob = record  //Job define
  ip: string;
  port: integer;
  commandID: byte;        
  params: Tparams; //Array of byte
  paramslength: byte;
end;

从工作线程发出 UDP 数据报:

procedure TthWorker.Execute;
var
  sendBuffer: TIDbytes;
  rBuffer: TIdBytes;
  rSendJob: TrSendJob;
begin
  inherited;
  repeat
    FQueue.PopItem(rSendJob); //Getting the job from the queue
    //Building the sending buffer
    sendBuffer[0] := rSendJob. ... ; 
    ...
    FIdUDPclient.SendBuffer( rSendJob.IP, rSendJob.port, sendBuffer );
    if FIdUDPclient.receiveBuffer(rbuffer, RECTIMEOUT_GLOBAL) >= 1 then
    begin
      //
    end;
  until Terminated;
end;

总之,thPoolSendCommand 将作业推入队列,工作线程从队列中拉取作业,然后发出 UDP 数据报,然后在 rbuffer 中取回应答字节。但是此时,如何将receivebuffer的内容传回给TDevice实例进行处理呢?这样做的正确(OOP)方法是什么?我需要在 TDevice 方法中进行处理。

如果我在 TrSendJob 定义一个加方法指针字段,那么工作线程知道调用什么方法来处理缓冲区并设置 FState TDevice?

的值
TProcessReceiveBuffer = function(ArBuffer: TIdBytes): integer;

TrSendJob = record  //Job define
  ip: string;
  port: integer;
  commandID: byte;        
  params: Tparams; //Tparams array of byte
  paramslength: byte;
  *ProcessReceiveBuffer: TProcessReceiveBuffer; //pointer to a TDevice method thats processing the receivebuffer
end;

或者,整个概念是错误的。

But at this point, how to send the contents of receivebuffer back to TDevice instance for processing?

您可以在 TrSendJob 中包含一个 TDevice 对象指针,然后 TthWorker 将知道将缓冲区传递给哪个 TDevice

或者,您可以将缓冲区和 TEvent 放入 TrSendJob,然后让 TthWorker 填充缓冲区并发出事件信号,然后让 TDevice.GetStatus() 等待事件信号并处理缓冲区。