使用 Devart 的 PgDAC 组件在 Delphi 的单独线程中处理来自 PostgreSQL 的 NOTICE 事件
Handling NOTICE events from PostgreSQL in a separate thread in Delphi with Devart's PgDAC components
我想在单独的线程中执行长查询,以便能够中止它们并向用户提供反馈。所有这一切都有效,但我有时会遇到访问冲突,因为我认为 OnNotice 事件的处理方式不正确,我想知道这样做的正确方法。
我在 Delphi 2010 上使用 Devart 的 PgDAC 和 OmniThreadLibrary。
我执行的 PostgreSQL 代码是一个包含如下内容的存储过程:
RAISE NOTICE 'ad: %',myad.name;
以下是我的代码中有趣的部分:
procedure TFDecomptes.FormCreate(Sender: TObject);
begin
ThreadConnection := TPgConnection.Create(Self);
ThreadConnection.Assign(DM.PgConnection1);
end;
此 ThreadConnection
是将用于执行查询的 TPgConnection(在单独的线程中)。
procedure TFDecomptes.BInterruptClick(Sender: TObject);
begin
ThreadConnection.BreakExec;
end;
这就是 "Interrupt query" 按钮的作用。我不确定这是否非常 "thread safe" 因为它在主线程中使用但在专用于 query-execution 线程的 TPgConnection 上做了一些事情。
procedure TFDecomptes.OmniEventMonitor1TaskMessage(const task: IOmniTaskControl; const msg: TOmniMessage);
begin
case msg.MsgID of
1: begin
CalculationError:=msg.MsgData.AsString;
end;
end;
end;
这是我显示线程执行期间发生的错误(例如 SQL 错误或查询取消)的地方。
procedure TFDecomptes.PgConnectionNotice(Sender: TObject; Errors: TPgErrors);
var s:String;
begin
s:=Errors[Errors.Count-1].ToString;
if copy(s,1,4)='ad: ' then begin
delete(s,1,4);
LAD.Caption:=s;
end;
end;
这是OnNotice事件处理。它所做的只是修改标签的标题。
procedure InternalExecQuery(const task: IOmniTask);
Var q:TPgSQL;
begin
q:=Task.Param['pgsql'];
Try
q.Execute;
Except
On E:Exception do task.Comm.Send(1,e.Message);
End;
end;
procedure TFDecomptes.StartClick(Sender: TObject);
begin
ThreadConnection.OnNotice:=PgConnectionNotice;
Timer1.Enabled:=True;
CalculationTask := CreateTask(InternalExecQuery, 'CalculeDecomptes')
.MonitorWith(OmniEventMonitor1)
.SetParameter('pgsql', PgSQL)
.Run;
end;
这就是查询的方式 运行。
所以PgConnectionNotice
事件(主线程中的运行ning)附加到ThreadConnection
(在query-execution线程中使用),这就是我怀疑生成这些随机访问违规。
我不知道该如何处理。我应该在 PgConnectionNotice 内部使用某种 lock
(同步吗?)。
这是我试过的:
procedure TFDecomptes.OmniEventMonitor1TaskMessage(const task: IOmniTaskControl; const msg: TOmniMessage);
begin
case msg.MsgID of
1: begin
CalculationError:=msg.MsgData.AsString;
end;
2: begin
lad.caption:='Here';
end;
end;
end;
procedure TFDecomptes.PgConnectionNotice(Sender: TObject; Errors: TPgErrors);
begin
// I am not using the passed string in this test
CalculationTask.Comm.Send(2,Errors[Errors.Count-1].ToString);
end;
OmniEventMonitor1TaskMessage
从未收到在 PgConnectionNotice 中发送的消息(MsgId=2)。
我试过使用 CalculationTask.Invoke
但不明白如何调用它来传递字符串参数(我认为 Delphi 2010 不允许匿名函数)。
当我尝试像这样取消查询的更简单操作时,它停止取消查询:
procedure TFDecomptes.DoTheInterrupt;
begin
ThreadConnection.BreakExec;
end;
procedure TFDecomptes.BInterruptClick(Sender: TObject);
begin
CalculationTask.Invoke(DoTheInterrupt);
end;
所以我想我不应该通过 CalculationTask
拨打电话。我应该将在 InternalExecQuery
中创建的任务存储在全局变量中并使用它吗?
主要问题是我混淆了 IOmniTask
和 IOmniTaskControl
。 IOmniTask
是用于向主线程发送消息的后台接口,而 IOmniTaskControl
是主线程的接口,用于与后台任务通信。
所以在 PgConnectionNotice
中使用 CalculationTask(这是一个 IOmniTaskControl
)是一个双重错误:因为 PgConnectionNotice
是从后台线程中触发的,所以我正在向后台发送消息线程,来自后台线程,使用主线程的变量。
所以我添加了一个名为RunningTask
的全局变量:
Var RunningTask : IOmniTask;
在表单的 OnCreate
中将其设置为 nil
并像这样修改任务的代码:
procedure InternalExecQuery(const task: IOmniTask);
Var q:TPgSQL;
begin
RunningTask := task;
try
q:=Task.Param['pgsql'];
Try
q.Execute;
Except
On E:Exception do task.Comm.Send(1,e.Message);
End;
finally
RunningTask := Nil;
end;
end;
OnNotice
事件现在看起来像:
procedure TFDecomptes.PgConnectionNotice(Sender: TObject; Errors: TPgErrors);
begin
if RunningTask=Nil then
// do nothing, old, pending notices
else
RunningTask.Comm.Send(2,Errors[Errors.Count-1].ToString);
end;
我知道定义一个全局变量并不干净,尽管我知道最多只有一个后台任务。我可能应该将 IOmniTask
引用存储在 ThreadConnection
中,因为这就是 Sender
在 PgConnectionNotice
.
中的内容
我想在单独的线程中执行长查询,以便能够中止它们并向用户提供反馈。所有这一切都有效,但我有时会遇到访问冲突,因为我认为 OnNotice 事件的处理方式不正确,我想知道这样做的正确方法。
我在 Delphi 2010 上使用 Devart 的 PgDAC 和 OmniThreadLibrary。
我执行的 PostgreSQL 代码是一个包含如下内容的存储过程:
RAISE NOTICE 'ad: %',myad.name;
以下是我的代码中有趣的部分:
procedure TFDecomptes.FormCreate(Sender: TObject);
begin
ThreadConnection := TPgConnection.Create(Self);
ThreadConnection.Assign(DM.PgConnection1);
end;
此 ThreadConnection
是将用于执行查询的 TPgConnection(在单独的线程中)。
procedure TFDecomptes.BInterruptClick(Sender: TObject);
begin
ThreadConnection.BreakExec;
end;
这就是 "Interrupt query" 按钮的作用。我不确定这是否非常 "thread safe" 因为它在主线程中使用但在专用于 query-execution 线程的 TPgConnection 上做了一些事情。
procedure TFDecomptes.OmniEventMonitor1TaskMessage(const task: IOmniTaskControl; const msg: TOmniMessage);
begin
case msg.MsgID of
1: begin
CalculationError:=msg.MsgData.AsString;
end;
end;
end;
这是我显示线程执行期间发生的错误(例如 SQL 错误或查询取消)的地方。
procedure TFDecomptes.PgConnectionNotice(Sender: TObject; Errors: TPgErrors);
var s:String;
begin
s:=Errors[Errors.Count-1].ToString;
if copy(s,1,4)='ad: ' then begin
delete(s,1,4);
LAD.Caption:=s;
end;
end;
这是OnNotice事件处理。它所做的只是修改标签的标题。
procedure InternalExecQuery(const task: IOmniTask);
Var q:TPgSQL;
begin
q:=Task.Param['pgsql'];
Try
q.Execute;
Except
On E:Exception do task.Comm.Send(1,e.Message);
End;
end;
procedure TFDecomptes.StartClick(Sender: TObject);
begin
ThreadConnection.OnNotice:=PgConnectionNotice;
Timer1.Enabled:=True;
CalculationTask := CreateTask(InternalExecQuery, 'CalculeDecomptes')
.MonitorWith(OmniEventMonitor1)
.SetParameter('pgsql', PgSQL)
.Run;
end;
这就是查询的方式 运行。
所以PgConnectionNotice
事件(主线程中的运行ning)附加到ThreadConnection
(在query-execution线程中使用),这就是我怀疑生成这些随机访问违规。
我不知道该如何处理。我应该在 PgConnectionNotice 内部使用某种 lock
(同步吗?)。
这是我试过的:
procedure TFDecomptes.OmniEventMonitor1TaskMessage(const task: IOmniTaskControl; const msg: TOmniMessage);
begin
case msg.MsgID of
1: begin
CalculationError:=msg.MsgData.AsString;
end;
2: begin
lad.caption:='Here';
end;
end;
end;
procedure TFDecomptes.PgConnectionNotice(Sender: TObject; Errors: TPgErrors);
begin
// I am not using the passed string in this test
CalculationTask.Comm.Send(2,Errors[Errors.Count-1].ToString);
end;
OmniEventMonitor1TaskMessage
从未收到在 PgConnectionNotice 中发送的消息(MsgId=2)。
我试过使用 CalculationTask.Invoke
但不明白如何调用它来传递字符串参数(我认为 Delphi 2010 不允许匿名函数)。
当我尝试像这样取消查询的更简单操作时,它停止取消查询:
procedure TFDecomptes.DoTheInterrupt;
begin
ThreadConnection.BreakExec;
end;
procedure TFDecomptes.BInterruptClick(Sender: TObject);
begin
CalculationTask.Invoke(DoTheInterrupt);
end;
所以我想我不应该通过 CalculationTask
拨打电话。我应该将在 InternalExecQuery
中创建的任务存储在全局变量中并使用它吗?
主要问题是我混淆了 IOmniTask
和 IOmniTaskControl
。 IOmniTask
是用于向主线程发送消息的后台接口,而 IOmniTaskControl
是主线程的接口,用于与后台任务通信。
所以在 PgConnectionNotice
中使用 CalculationTask(这是一个 IOmniTaskControl
)是一个双重错误:因为 PgConnectionNotice
是从后台线程中触发的,所以我正在向后台发送消息线程,来自后台线程,使用主线程的变量。
所以我添加了一个名为RunningTask
的全局变量:
Var RunningTask : IOmniTask;
在表单的 OnCreate
中将其设置为 nil
并像这样修改任务的代码:
procedure InternalExecQuery(const task: IOmniTask);
Var q:TPgSQL;
begin
RunningTask := task;
try
q:=Task.Param['pgsql'];
Try
q.Execute;
Except
On E:Exception do task.Comm.Send(1,e.Message);
End;
finally
RunningTask := Nil;
end;
end;
OnNotice
事件现在看起来像:
procedure TFDecomptes.PgConnectionNotice(Sender: TObject; Errors: TPgErrors);
begin
if RunningTask=Nil then
// do nothing, old, pending notices
else
RunningTask.Comm.Send(2,Errors[Errors.Count-1].ToString);
end;
我知道定义一个全局变量并不干净,尽管我知道最多只有一个后台任务。我可能应该将 IOmniTask
引用存储在 ThreadConnection
中,因为这就是 Sender
在 PgConnectionNotice
.