线程如何在另一个现有线程的上下文中强制执行函数?

How can a thread force execution of a function in the context of another existing thread?

在一个 Delphi/Linux 程序中,假设我有两个 运行 线程,ThreadA 和 ThreadB。在某个时间点 ThreadB 需要让 ThreadA 执行一个函数并阻塞直到函数 returns.

在 Delphi 中,我们有 TThread.Synchronize 来完成工作,但仅当 ThreadA 是主线程时。

有什么想法吗?我使用 Delphi 但也欢迎使用 C 代码回答。

要做到这一点,线程必须 co-operate,没有跨线程触发事件的机制。但是,如果您准备实现这样的机制,那并不难,当然您可以查看 TThread.Synchronize 的源代码以获取提示。

借用 TThread.Syncrhonize 的来源,我得出了以下结论。您将必须让 co-operating 线程在其主循环中检查其队列 - 这当然是 TThread.Synchronize 的工作方式。

以下代码基于我们在生产中使用的代码 - 如果有评论或引用不在本单元中的项目,我深表歉意。没有提供函数结果的机制,但这可以通过使用不同的调用模板来解决(因此结果类型是已知的)。我已经允许一个结果 TObject(即使没有办法知道当前应该是什么)以便 multi-valued 结果可以在需要时 returned。

下面没有windows具体代码,因此应该按照您的要求在Linux上工作。

unit Unit1;

interface

uses  Classes, SyncObjs, Generics.Collections;

  type
  TQueuedCallback = class(TObject)
  protected
    _pEvent:            TEvent;
    _pResult:           TObject;
    _fnMethod:          TThreadMethod;
    _fnProcedure:       TThreadProcedure;

  public

    property  Event: TEvent read _pEvent write _pEvent;
    property  Result: TObject read _pResult write _pResult;
    property  Method: TThreadMethod read _fnMethod write _fnMethod;
    property  Proc: TThreadProcedure read _fnProcedure write _fnProcedure;

  end;

  TQueueableThread = class(TThread)
  protected
    _pCSLock:           TCriticalSection;
    _pQueuedCalls:      TList<TQueuedCallback>;
    _haSignals:         THandleObjectArray;
    _pQueueEvent:       TEvent;
    _pStopEvent:        TEvent;
    _dwMaxWait:         Cardinal;

    procedure _DoWork(nEventIndex: Integer); virtual; abstract;   // where th thread does it's work
    procedure _ExecuteQueued(blAll: Boolean = False); virtual;
  public
    destructor Destroy; override;

    procedure AfterConstruction(); override;
    procedure Execute(); override;
    procedure QueueProcedure(fnMethod: TThreadMethod); overload; virtual;
    procedure QueueProcedure(fnProcedure: TThreadProcedure); overload; virtual;
    procedure QueueProcedureAndWait(fnMethod: TThreadMethod); overload; virtual;
    procedure QueueProcedureAndWait(fnProcedure: TThreadProcedure); overload; virtual;
    function  QueueProcedureAndWaitForResult(fnMethod: TThreadMethod): TObject; overload; virtual;
    function  QueueProcedureAndWaitForResult(fnProcedure: TThreadProcedure): TObject; overload; virtual;

  end;

implementation

uses  SysUtils;

  { TQueueableThread }

  procedure TQueueableThread._ExecuteQueued(blAll: Boolean);
  begin
    repeat
      Self._pCSLock.Enter();
      if(Self._pQueuedCalls.Count>0) then
      begin
        if(Assigned(Self._pQueuedCalls.Items[0].Method)) then
          Self._pQueuedCalls.Items[0].Method()
        else if(Assigned(Self._pQueuedCalls.Items[0].Proc)) then
          Self._pQueuedCalls.Items[0].Proc();
        // No mechanism for supplying a result ...
        if(Self._pQueuedCalls.Items[0]._pEvent<>nil) then
          Self._pQueuedCalls.Items[0]._pEvent.SetEvent()
        else
          Self._pQueuedCalls.Items[0].Free;
        Self._pQueuedCalls.Delete(0);
      end;
      blAll:=(blAll And (Self._pQueuedCalls.Count>0));
      Self._pCSLock.Leave();
    until not blAll;
  end;

  destructor TQueueableThread.Destroy;
  begin
    if(Self._pQueuedCalls<>nil) then
    begin
      while(Self._pQueuedCalls.Count>0) do
      begin
        if(Self._pQueuedCalls.Items[0].Event<>nil) then
          Self._pQueuedCalls.Items[0].Event.SetEvent()
        else
          Self._pQueuedCalls.Items[0].Free();
        Self._pQueuedCalls.Delete(0);
      end;
      FreeAndNil(Self._pQueuedCalls);
    end;
    FreeAndNil(Self._pQueueEvent);
    FreeAndNil(Self._pStopEvent);
    FreeAndNil(Self._pCSLock);
    inherited;
  end;

  procedure TQueueableThread.AfterConstruction();
  begin
    inherited;
    Self._pCSLock:=TCriticalSection.Create();
    Self._pQueuedCalls:=TList<TQueuedCallback>.Create();
    SetLength(Self._haSignals, 2);
    Self._pQueueEvent:=TEvent.Create();
    Self._haSignals[0]:=Self._pQueueEvent;
    Self._pStopEvent:=TEvent.Create();
    Self._haSignals[1]:=Self._pStopEvent;
    Self._dwMaxWait:=30000;
  end;

  procedure TQueueableThread.Execute();
  var
    dwWaitResult:       TWaitResult;
    nEventIndex:        Integer;
    nLoop:              Integer;
    pSignalled:         THandleObject;
  begin
    while(not Self.Terminated) do
    begin
      //LogThreadMessage(GetCurrentThreadId(), Self.ClassType, Format('WaitingFor: %u', [Self._MaxWaitTime]));
      dwWaitResult:=THandleObject.WaitForMultiple(Self._haSignals, Self._dwMaxWait, False, pSignalled);
      //LogThreadMessage(GetCurrentThreadId(), Self.ClassType, Format('WaitForMultipleObjects Result: %u', [dwWaitResult]));
      if(dwWaitResult=wrError) then
        Self.Terminate;
      if not Self.Terminated then
      begin
        if(pSignalled=Self._pQueueEvent) then
        begin
          Self._ExecuteQueued(True);
          Self._pQueueEvent.ResetEvent();
        end
        else if(pSignalled=Self._pStopEvent) then
          Self.Terminate()
        else
        begin
          nEventIndex:=-2;
          if(dwWaitResult=wrTimeout) then
            nEventIndex:=-1
          else
          begin
            nLoop:=0;
            while( (nEventIndex<0) And (nLoop<Length(Self._haSignals)) ) do
            begin
              if(Self._haSignals[nLoop]=pSignalled) then
                nEventIndex:=nLoop
              else
                Inc(nLoop);
            end;
            if(nEventIndex>-2) then
            begin
              try
                Self._DoWork(nEventIndex);
              except
                on e: Exception do
                  // error handling
              end;
            end;
          end;
        end;
      end;
    end;
  end;

  procedure TQueueableThread.QueueProcedure(fnMethod: TThreadMethod);
  var
    pQueue:             TQueuedCallback;
  begin
    if(Assigned(fnMethod)) then
    begin
      Self._pCSLock.Enter();
      pQueue:=TQueuedCallback.Create();
      pQueue.Method:=fnMethod;
      Self._pQueuedCalls.Add(pQueue);
      Self._pQueueEvent.SetEvent();
      Self._pCSLock.Leave();
    end;
  end;

  procedure TQueueableThread.QueueProcedure(fnProcedure: TThreadProcedure);
  var
    pQueue:             TQueuedCallback;
  begin
    if(Assigned(fnProcedure)) then
    begin
      Self._pCSLock.Enter();
      pQueue:=TQueuedCallback.Create();
      pQueue.Proc:=fnProcedure;
      Self._pQueuedCalls.Add(pQueue);
      Self._pQueueEvent.SetEvent();
      Self._pCSLock.Leave();
    end;
  end;

  procedure TQueueableThread.QueueProcedureAndWait(fnMethod: TThreadMethod);
  var
    pQueue:             TQueuedCallback;
  begin
    if(Assigned(fnMethod)) then
    begin
      Self._pCSLock.Enter();
      pQueue:=TQueuedCallback.Create();
      pQueue.Method:=fnMethod;
      pQueue.Event:=TEvent.Create();
      Self._pQueuedCalls.Add(pQueue);
      Self._pQueueEvent.SetEvent();
      Self._pCSLock.Leave();
      pQueue._pEvent.WaitFor(INFINITE);
      FreeAndNil(pQueue._pEvent);
      FreeAndNil(pQueue);
    end;
  end;

  procedure TQueueableThread.QueueProcedureAndWait(fnProcedure: TThreadProcedure);
  var
    pQueue:             TQueuedCallback;
  begin
    if(Assigned(fnPRocedure)) then
    begin
      Self._pCSLock.Enter();
      pQueue:=TQueuedCallback.Create();
      pQueue.Proc:=fnProcedure;
      pQueue.Event:=TEvent.Create();
      Self._pQueuedCalls.Add(pQueue);
      Self._pQueueEvent.SetEvent();
      Self._pCSLock.Leave();
      pQueue._pEvent.WaitFor(INFINITE);
      FreeAndNil(pQueue._pEvent);
      FreeAndNil(pQueue);
    end;
  end;

  function  TQueueableThread.QueueProcedureAndWaitForResult(fnMethod: TThreadMethod): TObject;
  var
    pQueue:             TQueuedCallback;
  begin
    Result:=nil;
    if(Assigned(fnMethod)) then
    begin
      Self._pCSLock.Enter();
      pQueue:=TQueuedCallback.Create();
      pQueue.Method:=fnMethod;
      pQueue.Event:=TEvent.Create();
      Self._pQueuedCalls.Add(pQueue);
      Self._pQueueEvent.SetEvent();
      Self._pCSLock.Leave();
      pQueue._pEvent.WaitFor(INFINITE);
      Result:=pQueue._pResult;
      FreeAndNil(pQueue._pEvent);
      FreeAndNil(pQueue);
    end;
  end;

  function  TQueueableThread.QueueProcedureAndWaitForResult(fnProcedure: TThreadProcedure): TObject;
  var
    pQueue:             TQueuedCallback;
  begin
    Result:=nil;
    if(Assigned(fnProcedure)) then
    begin
      Self._pCSLock.Enter();
      pQueue:=TQueuedCallback.Create();
      pQueue.Proc:=fnProcedure;
      pQueue.Event:=TEvent.Create();
      Self._pQueuedCalls.Add(pQueue);
      Self._pQueueEvent.SetEvent();
      Self._pCSLock.Leave();
      pQueue._pEvent.WaitFor(INFINITE);
      Result:=pQueue._pResult;
      FreeAndNil(pQueue._pEvent);
      FreeAndNil(pQueue);
    end;
  end;

end.

您可以继承 TQueuedCallback 的 类 使用特定的调用模板,这将是识别 return 值

的一种方法