线程如何在另一个现有线程的上下文中强制执行函数?
How can a thread force execution of a function in the context of another existing thread?
在一个 Delphi/Linux 程序中,假设我有两个 运行 线程,ThreadA 和 ThreadB。在某个时间点 ThreadB 需要让 ThreadA 执行一个函数并阻塞直到函数 returns.
在 Delphi 中,我们有 TThread.Synchronize 来完成工作,但仅当 ThreadA 是主线程时。
有什么想法吗?我使用 Delphi 但也欢迎使用 C 代码回答。
要做到这一点,线程必须 co-operate,没有跨线程触发事件的机制。但是,如果您准备实现这样的机制,那并不难,当然您可以查看 TThread.Synchronize
的源代码以获取提示。
借用 TThread.Syncrhonize
的来源,我得出了以下结论。您将必须让 co-operating 线程在其主循环中检查其队列 - 这当然是 TThread.Synchronize
的工作方式。
以下代码基于我们在生产中使用的代码 - 如果有评论或引用不在本单元中的项目,我深表歉意。没有提供函数结果的机制,但这可以通过使用不同的调用模板来解决(因此结果类型是已知的)。我已经允许一个结果 TObject
(即使没有办法知道当前应该是什么)以便 multi-valued 结果可以在需要时 returned。
下面没有windows具体代码,因此应该按照您的要求在Linux上工作。
unit Unit1;
interface
uses Classes, SyncObjs, Generics.Collections;
type
TQueuedCallback = class(TObject)
protected
_pEvent: TEvent;
_pResult: TObject;
_fnMethod: TThreadMethod;
_fnProcedure: TThreadProcedure;
public
property Event: TEvent read _pEvent write _pEvent;
property Result: TObject read _pResult write _pResult;
property Method: TThreadMethod read _fnMethod write _fnMethod;
property Proc: TThreadProcedure read _fnProcedure write _fnProcedure;
end;
TQueueableThread = class(TThread)
protected
_pCSLock: TCriticalSection;
_pQueuedCalls: TList<TQueuedCallback>;
_haSignals: THandleObjectArray;
_pQueueEvent: TEvent;
_pStopEvent: TEvent;
_dwMaxWait: Cardinal;
procedure _DoWork(nEventIndex: Integer); virtual; abstract; // where th thread does it's work
procedure _ExecuteQueued(blAll: Boolean = False); virtual;
public
destructor Destroy; override;
procedure AfterConstruction(); override;
procedure Execute(); override;
procedure QueueProcedure(fnMethod: TThreadMethod); overload; virtual;
procedure QueueProcedure(fnProcedure: TThreadProcedure); overload; virtual;
procedure QueueProcedureAndWait(fnMethod: TThreadMethod); overload; virtual;
procedure QueueProcedureAndWait(fnProcedure: TThreadProcedure); overload; virtual;
function QueueProcedureAndWaitForResult(fnMethod: TThreadMethod): TObject; overload; virtual;
function QueueProcedureAndWaitForResult(fnProcedure: TThreadProcedure): TObject; overload; virtual;
end;
implementation
uses SysUtils;
{ TQueueableThread }
procedure TQueueableThread._ExecuteQueued(blAll: Boolean);
begin
repeat
Self._pCSLock.Enter();
if(Self._pQueuedCalls.Count>0) then
begin
if(Assigned(Self._pQueuedCalls.Items[0].Method)) then
Self._pQueuedCalls.Items[0].Method()
else if(Assigned(Self._pQueuedCalls.Items[0].Proc)) then
Self._pQueuedCalls.Items[0].Proc();
// No mechanism for supplying a result ...
if(Self._pQueuedCalls.Items[0]._pEvent<>nil) then
Self._pQueuedCalls.Items[0]._pEvent.SetEvent()
else
Self._pQueuedCalls.Items[0].Free;
Self._pQueuedCalls.Delete(0);
end;
blAll:=(blAll And (Self._pQueuedCalls.Count>0));
Self._pCSLock.Leave();
until not blAll;
end;
destructor TQueueableThread.Destroy;
begin
if(Self._pQueuedCalls<>nil) then
begin
while(Self._pQueuedCalls.Count>0) do
begin
if(Self._pQueuedCalls.Items[0].Event<>nil) then
Self._pQueuedCalls.Items[0].Event.SetEvent()
else
Self._pQueuedCalls.Items[0].Free();
Self._pQueuedCalls.Delete(0);
end;
FreeAndNil(Self._pQueuedCalls);
end;
FreeAndNil(Self._pQueueEvent);
FreeAndNil(Self._pStopEvent);
FreeAndNil(Self._pCSLock);
inherited;
end;
procedure TQueueableThread.AfterConstruction();
begin
inherited;
Self._pCSLock:=TCriticalSection.Create();
Self._pQueuedCalls:=TList<TQueuedCallback>.Create();
SetLength(Self._haSignals, 2);
Self._pQueueEvent:=TEvent.Create();
Self._haSignals[0]:=Self._pQueueEvent;
Self._pStopEvent:=TEvent.Create();
Self._haSignals[1]:=Self._pStopEvent;
Self._dwMaxWait:=30000;
end;
procedure TQueueableThread.Execute();
var
dwWaitResult: TWaitResult;
nEventIndex: Integer;
nLoop: Integer;
pSignalled: THandleObject;
begin
while(not Self.Terminated) do
begin
//LogThreadMessage(GetCurrentThreadId(), Self.ClassType, Format('WaitingFor: %u', [Self._MaxWaitTime]));
dwWaitResult:=THandleObject.WaitForMultiple(Self._haSignals, Self._dwMaxWait, False, pSignalled);
//LogThreadMessage(GetCurrentThreadId(), Self.ClassType, Format('WaitForMultipleObjects Result: %u', [dwWaitResult]));
if(dwWaitResult=wrError) then
Self.Terminate;
if not Self.Terminated then
begin
if(pSignalled=Self._pQueueEvent) then
begin
Self._ExecuteQueued(True);
Self._pQueueEvent.ResetEvent();
end
else if(pSignalled=Self._pStopEvent) then
Self.Terminate()
else
begin
nEventIndex:=-2;
if(dwWaitResult=wrTimeout) then
nEventIndex:=-1
else
begin
nLoop:=0;
while( (nEventIndex<0) And (nLoop<Length(Self._haSignals)) ) do
begin
if(Self._haSignals[nLoop]=pSignalled) then
nEventIndex:=nLoop
else
Inc(nLoop);
end;
if(nEventIndex>-2) then
begin
try
Self._DoWork(nEventIndex);
except
on e: Exception do
// error handling
end;
end;
end;
end;
end;
end;
end;
procedure TQueueableThread.QueueProcedure(fnMethod: TThreadMethod);
var
pQueue: TQueuedCallback;
begin
if(Assigned(fnMethod)) then
begin
Self._pCSLock.Enter();
pQueue:=TQueuedCallback.Create();
pQueue.Method:=fnMethod;
Self._pQueuedCalls.Add(pQueue);
Self._pQueueEvent.SetEvent();
Self._pCSLock.Leave();
end;
end;
procedure TQueueableThread.QueueProcedure(fnProcedure: TThreadProcedure);
var
pQueue: TQueuedCallback;
begin
if(Assigned(fnProcedure)) then
begin
Self._pCSLock.Enter();
pQueue:=TQueuedCallback.Create();
pQueue.Proc:=fnProcedure;
Self._pQueuedCalls.Add(pQueue);
Self._pQueueEvent.SetEvent();
Self._pCSLock.Leave();
end;
end;
procedure TQueueableThread.QueueProcedureAndWait(fnMethod: TThreadMethod);
var
pQueue: TQueuedCallback;
begin
if(Assigned(fnMethod)) then
begin
Self._pCSLock.Enter();
pQueue:=TQueuedCallback.Create();
pQueue.Method:=fnMethod;
pQueue.Event:=TEvent.Create();
Self._pQueuedCalls.Add(pQueue);
Self._pQueueEvent.SetEvent();
Self._pCSLock.Leave();
pQueue._pEvent.WaitFor(INFINITE);
FreeAndNil(pQueue._pEvent);
FreeAndNil(pQueue);
end;
end;
procedure TQueueableThread.QueueProcedureAndWait(fnProcedure: TThreadProcedure);
var
pQueue: TQueuedCallback;
begin
if(Assigned(fnPRocedure)) then
begin
Self._pCSLock.Enter();
pQueue:=TQueuedCallback.Create();
pQueue.Proc:=fnProcedure;
pQueue.Event:=TEvent.Create();
Self._pQueuedCalls.Add(pQueue);
Self._pQueueEvent.SetEvent();
Self._pCSLock.Leave();
pQueue._pEvent.WaitFor(INFINITE);
FreeAndNil(pQueue._pEvent);
FreeAndNil(pQueue);
end;
end;
function TQueueableThread.QueueProcedureAndWaitForResult(fnMethod: TThreadMethod): TObject;
var
pQueue: TQueuedCallback;
begin
Result:=nil;
if(Assigned(fnMethod)) then
begin
Self._pCSLock.Enter();
pQueue:=TQueuedCallback.Create();
pQueue.Method:=fnMethod;
pQueue.Event:=TEvent.Create();
Self._pQueuedCalls.Add(pQueue);
Self._pQueueEvent.SetEvent();
Self._pCSLock.Leave();
pQueue._pEvent.WaitFor(INFINITE);
Result:=pQueue._pResult;
FreeAndNil(pQueue._pEvent);
FreeAndNil(pQueue);
end;
end;
function TQueueableThread.QueueProcedureAndWaitForResult(fnProcedure: TThreadProcedure): TObject;
var
pQueue: TQueuedCallback;
begin
Result:=nil;
if(Assigned(fnProcedure)) then
begin
Self._pCSLock.Enter();
pQueue:=TQueuedCallback.Create();
pQueue.Proc:=fnProcedure;
pQueue.Event:=TEvent.Create();
Self._pQueuedCalls.Add(pQueue);
Self._pQueueEvent.SetEvent();
Self._pCSLock.Leave();
pQueue._pEvent.WaitFor(INFINITE);
Result:=pQueue._pResult;
FreeAndNil(pQueue._pEvent);
FreeAndNil(pQueue);
end;
end;
end.
您可以继承 TQueuedCallback
的 类 使用特定的调用模板,这将是识别 return 值
的一种方法
在一个 Delphi/Linux 程序中,假设我有两个 运行 线程,ThreadA 和 ThreadB。在某个时间点 ThreadB 需要让 ThreadA 执行一个函数并阻塞直到函数 returns.
在 Delphi 中,我们有 TThread.Synchronize 来完成工作,但仅当 ThreadA 是主线程时。
有什么想法吗?我使用 Delphi 但也欢迎使用 C 代码回答。
要做到这一点,线程必须 co-operate,没有跨线程触发事件的机制。但是,如果您准备实现这样的机制,那并不难,当然您可以查看 TThread.Synchronize
的源代码以获取提示。
借用 TThread.Syncrhonize
的来源,我得出了以下结论。您将必须让 co-operating 线程在其主循环中检查其队列 - 这当然是 TThread.Synchronize
的工作方式。
以下代码基于我们在生产中使用的代码 - 如果有评论或引用不在本单元中的项目,我深表歉意。没有提供函数结果的机制,但这可以通过使用不同的调用模板来解决(因此结果类型是已知的)。我已经允许一个结果 TObject
(即使没有办法知道当前应该是什么)以便 multi-valued 结果可以在需要时 returned。
下面没有windows具体代码,因此应该按照您的要求在Linux上工作。
unit Unit1;
interface
uses Classes, SyncObjs, Generics.Collections;
type
TQueuedCallback = class(TObject)
protected
_pEvent: TEvent;
_pResult: TObject;
_fnMethod: TThreadMethod;
_fnProcedure: TThreadProcedure;
public
property Event: TEvent read _pEvent write _pEvent;
property Result: TObject read _pResult write _pResult;
property Method: TThreadMethod read _fnMethod write _fnMethod;
property Proc: TThreadProcedure read _fnProcedure write _fnProcedure;
end;
TQueueableThread = class(TThread)
protected
_pCSLock: TCriticalSection;
_pQueuedCalls: TList<TQueuedCallback>;
_haSignals: THandleObjectArray;
_pQueueEvent: TEvent;
_pStopEvent: TEvent;
_dwMaxWait: Cardinal;
procedure _DoWork(nEventIndex: Integer); virtual; abstract; // where th thread does it's work
procedure _ExecuteQueued(blAll: Boolean = False); virtual;
public
destructor Destroy; override;
procedure AfterConstruction(); override;
procedure Execute(); override;
procedure QueueProcedure(fnMethod: TThreadMethod); overload; virtual;
procedure QueueProcedure(fnProcedure: TThreadProcedure); overload; virtual;
procedure QueueProcedureAndWait(fnMethod: TThreadMethod); overload; virtual;
procedure QueueProcedureAndWait(fnProcedure: TThreadProcedure); overload; virtual;
function QueueProcedureAndWaitForResult(fnMethod: TThreadMethod): TObject; overload; virtual;
function QueueProcedureAndWaitForResult(fnProcedure: TThreadProcedure): TObject; overload; virtual;
end;
implementation
uses SysUtils;
{ TQueueableThread }
procedure TQueueableThread._ExecuteQueued(blAll: Boolean);
begin
repeat
Self._pCSLock.Enter();
if(Self._pQueuedCalls.Count>0) then
begin
if(Assigned(Self._pQueuedCalls.Items[0].Method)) then
Self._pQueuedCalls.Items[0].Method()
else if(Assigned(Self._pQueuedCalls.Items[0].Proc)) then
Self._pQueuedCalls.Items[0].Proc();
// No mechanism for supplying a result ...
if(Self._pQueuedCalls.Items[0]._pEvent<>nil) then
Self._pQueuedCalls.Items[0]._pEvent.SetEvent()
else
Self._pQueuedCalls.Items[0].Free;
Self._pQueuedCalls.Delete(0);
end;
blAll:=(blAll And (Self._pQueuedCalls.Count>0));
Self._pCSLock.Leave();
until not blAll;
end;
destructor TQueueableThread.Destroy;
begin
if(Self._pQueuedCalls<>nil) then
begin
while(Self._pQueuedCalls.Count>0) do
begin
if(Self._pQueuedCalls.Items[0].Event<>nil) then
Self._pQueuedCalls.Items[0].Event.SetEvent()
else
Self._pQueuedCalls.Items[0].Free();
Self._pQueuedCalls.Delete(0);
end;
FreeAndNil(Self._pQueuedCalls);
end;
FreeAndNil(Self._pQueueEvent);
FreeAndNil(Self._pStopEvent);
FreeAndNil(Self._pCSLock);
inherited;
end;
procedure TQueueableThread.AfterConstruction();
begin
inherited;
Self._pCSLock:=TCriticalSection.Create();
Self._pQueuedCalls:=TList<TQueuedCallback>.Create();
SetLength(Self._haSignals, 2);
Self._pQueueEvent:=TEvent.Create();
Self._haSignals[0]:=Self._pQueueEvent;
Self._pStopEvent:=TEvent.Create();
Self._haSignals[1]:=Self._pStopEvent;
Self._dwMaxWait:=30000;
end;
procedure TQueueableThread.Execute();
var
dwWaitResult: TWaitResult;
nEventIndex: Integer;
nLoop: Integer;
pSignalled: THandleObject;
begin
while(not Self.Terminated) do
begin
//LogThreadMessage(GetCurrentThreadId(), Self.ClassType, Format('WaitingFor: %u', [Self._MaxWaitTime]));
dwWaitResult:=THandleObject.WaitForMultiple(Self._haSignals, Self._dwMaxWait, False, pSignalled);
//LogThreadMessage(GetCurrentThreadId(), Self.ClassType, Format('WaitForMultipleObjects Result: %u', [dwWaitResult]));
if(dwWaitResult=wrError) then
Self.Terminate;
if not Self.Terminated then
begin
if(pSignalled=Self._pQueueEvent) then
begin
Self._ExecuteQueued(True);
Self._pQueueEvent.ResetEvent();
end
else if(pSignalled=Self._pStopEvent) then
Self.Terminate()
else
begin
nEventIndex:=-2;
if(dwWaitResult=wrTimeout) then
nEventIndex:=-1
else
begin
nLoop:=0;
while( (nEventIndex<0) And (nLoop<Length(Self._haSignals)) ) do
begin
if(Self._haSignals[nLoop]=pSignalled) then
nEventIndex:=nLoop
else
Inc(nLoop);
end;
if(nEventIndex>-2) then
begin
try
Self._DoWork(nEventIndex);
except
on e: Exception do
// error handling
end;
end;
end;
end;
end;
end;
end;
procedure TQueueableThread.QueueProcedure(fnMethod: TThreadMethod);
var
pQueue: TQueuedCallback;
begin
if(Assigned(fnMethod)) then
begin
Self._pCSLock.Enter();
pQueue:=TQueuedCallback.Create();
pQueue.Method:=fnMethod;
Self._pQueuedCalls.Add(pQueue);
Self._pQueueEvent.SetEvent();
Self._pCSLock.Leave();
end;
end;
procedure TQueueableThread.QueueProcedure(fnProcedure: TThreadProcedure);
var
pQueue: TQueuedCallback;
begin
if(Assigned(fnProcedure)) then
begin
Self._pCSLock.Enter();
pQueue:=TQueuedCallback.Create();
pQueue.Proc:=fnProcedure;
Self._pQueuedCalls.Add(pQueue);
Self._pQueueEvent.SetEvent();
Self._pCSLock.Leave();
end;
end;
procedure TQueueableThread.QueueProcedureAndWait(fnMethod: TThreadMethod);
var
pQueue: TQueuedCallback;
begin
if(Assigned(fnMethod)) then
begin
Self._pCSLock.Enter();
pQueue:=TQueuedCallback.Create();
pQueue.Method:=fnMethod;
pQueue.Event:=TEvent.Create();
Self._pQueuedCalls.Add(pQueue);
Self._pQueueEvent.SetEvent();
Self._pCSLock.Leave();
pQueue._pEvent.WaitFor(INFINITE);
FreeAndNil(pQueue._pEvent);
FreeAndNil(pQueue);
end;
end;
procedure TQueueableThread.QueueProcedureAndWait(fnProcedure: TThreadProcedure);
var
pQueue: TQueuedCallback;
begin
if(Assigned(fnPRocedure)) then
begin
Self._pCSLock.Enter();
pQueue:=TQueuedCallback.Create();
pQueue.Proc:=fnProcedure;
pQueue.Event:=TEvent.Create();
Self._pQueuedCalls.Add(pQueue);
Self._pQueueEvent.SetEvent();
Self._pCSLock.Leave();
pQueue._pEvent.WaitFor(INFINITE);
FreeAndNil(pQueue._pEvent);
FreeAndNil(pQueue);
end;
end;
function TQueueableThread.QueueProcedureAndWaitForResult(fnMethod: TThreadMethod): TObject;
var
pQueue: TQueuedCallback;
begin
Result:=nil;
if(Assigned(fnMethod)) then
begin
Self._pCSLock.Enter();
pQueue:=TQueuedCallback.Create();
pQueue.Method:=fnMethod;
pQueue.Event:=TEvent.Create();
Self._pQueuedCalls.Add(pQueue);
Self._pQueueEvent.SetEvent();
Self._pCSLock.Leave();
pQueue._pEvent.WaitFor(INFINITE);
Result:=pQueue._pResult;
FreeAndNil(pQueue._pEvent);
FreeAndNil(pQueue);
end;
end;
function TQueueableThread.QueueProcedureAndWaitForResult(fnProcedure: TThreadProcedure): TObject;
var
pQueue: TQueuedCallback;
begin
Result:=nil;
if(Assigned(fnProcedure)) then
begin
Self._pCSLock.Enter();
pQueue:=TQueuedCallback.Create();
pQueue.Proc:=fnProcedure;
pQueue.Event:=TEvent.Create();
Self._pQueuedCalls.Add(pQueue);
Self._pQueueEvent.SetEvent();
Self._pCSLock.Leave();
pQueue._pEvent.WaitFor(INFINITE);
Result:=pQueue._pResult;
FreeAndNil(pQueue._pEvent);
FreeAndNil(pQueue);
end;
end;
end.
您可以继承 TQueuedCallback
的 类 使用特定的调用模板,这将是识别 return 值