使用 Devart 的 PgDAC 组件在 Delphi 的单独线程中处理来自 PostgreSQL 的 NOTICE 事件

Handling NOTICE events from PostgreSQL in a separate thread in Delphi with Devart's PgDAC components

我想在单独的线程中执行长查询,以便能够中止它们并向用户提供反馈。所有这一切都有效,但我有时会遇到访问冲突,因为我认为 OnNotice 事件的处理方式不正确,我想知道这样做的正确方法。

我在 Delphi 2010 上使用 Devart 的 PgDAC 和 OmniThreadLibrary。

我执行的 PostgreSQL 代码是一个包含如下内容的存储过程:

RAISE NOTICE 'ad: %',myad.name;

以下是我的代码中有趣的部分:

procedure TFDecomptes.FormCreate(Sender: TObject);
begin
  ThreadConnection := TPgConnection.Create(Self);
  ThreadConnection.Assign(DM.PgConnection1);
end;

ThreadConnection 是将用于执行查询的 TPgConnection(在单独的线程中)。

procedure TFDecomptes.BInterruptClick(Sender: TObject);
begin
  ThreadConnection.BreakExec;
end;

这就是 "Interrupt query" 按钮的作用。我不确定这是否非常 "thread safe" 因为它在主线程中使用但在专用于 query-execution 线程的 TPgConnection 上做了一些事情。

procedure TFDecomptes.OmniEventMonitor1TaskMessage(const task: IOmniTaskControl; const msg: TOmniMessage);
begin
  case msg.MsgID of
    1: begin
         CalculationError:=msg.MsgData.AsString;
       end;
  end;
end;

这是我显示线程执行期间发生的错误(例如 SQL 错误或查询取消)的地方。

procedure TFDecomptes.PgConnectionNotice(Sender: TObject; Errors: TPgErrors);
var s:String;
begin
  s:=Errors[Errors.Count-1].ToString;
  if copy(s,1,4)='ad: ' then begin
    delete(s,1,4);
    LAD.Caption:=s;
  end;
end;

这是OnNotice事件处理。它所做的只是修改标签的标题。

procedure InternalExecQuery(const task: IOmniTask);
Var q:TPgSQL;
begin
  q:=Task.Param['pgsql'];
  Try
    q.Execute;
  Except
    On E:Exception do task.Comm.Send(1,e.Message);
  End;
end;

procedure TFDecomptes.StartClick(Sender: TObject);
begin
  ThreadConnection.OnNotice:=PgConnectionNotice;
  Timer1.Enabled:=True;
  CalculationTask := CreateTask(InternalExecQuery, 'CalculeDecomptes')
    .MonitorWith(OmniEventMonitor1)
    .SetParameter('pgsql', PgSQL)
    .Run;
end;

这就是查询的方式 运行。

所以PgConnectionNotice事件(主线程中的运行ning)附加到ThreadConnection(在query-execution线程中使用),这就是我怀疑生成这些随机访问违规。

我不知道该如何处理。我应该在 PgConnectionNotice 内部使用某种 lock(同步吗?)。

这是我试过的:

procedure TFDecomptes.OmniEventMonitor1TaskMessage(const task: IOmniTaskControl; const msg: TOmniMessage);
begin
  case msg.MsgID of
    1: begin
         CalculationError:=msg.MsgData.AsString;
       end;

    2: begin
         lad.caption:='Here';
       end;
  end;
end;

procedure TFDecomptes.PgConnectionNotice(Sender: TObject; Errors: TPgErrors);
begin
  // I am not using the passed string in this test
  CalculationTask.Comm.Send(2,Errors[Errors.Count-1].ToString);
end;

OmniEventMonitor1TaskMessage 从未收到在 PgConnectionNotice 中发送的消息(MsgId=2)。

我试过使用 CalculationTask.Invoke 但不明白如何调用它来传递字符串参数(我认为 Delphi 2010 不允许匿名函数)。

当我尝试像这样取消查询的更简单操作时,它停止取消查询:

procedure TFDecomptes.DoTheInterrupt;
begin
  ThreadConnection.BreakExec;
end;

procedure TFDecomptes.BInterruptClick(Sender: TObject);
begin
  CalculationTask.Invoke(DoTheInterrupt);
end;

所以我想我不应该通过 CalculationTask 拨打电话。我应该将在 InternalExecQuery 中创建的任务存储在全局变量中并使用它吗?

主要问题是我混淆了 IOmniTaskIOmniTaskControlIOmniTask 是用于向主线程发送消息的后台接口,而 IOmniTaskControl 是主线程的接口,用于与后台任务通信。

所以在 PgConnectionNotice 中使用 CalculationTask(这是一个 IOmniTaskControl)是一个双重错误:因为 PgConnectionNotice 是从后台线程中触发的,所以我正在向后台发送消息线程,来自后台线程,使用主线程的变量。

所以我添加了一个名为RunningTask的全局变量:

Var RunningTask : IOmniTask;

在表单的 OnCreate 中将其设置为 nil 并像这样修改任务的代码:

procedure InternalExecQuery(const task: IOmniTask);
Var q:TPgSQL;
begin
  RunningTask := task;
  try
    q:=Task.Param['pgsql'];
    Try
      q.Execute;
    Except
      On E:Exception do task.Comm.Send(1,e.Message);
    End;
  finally
    RunningTask := Nil;
  end;
end;

OnNotice 事件现在看起来像:

procedure TFDecomptes.PgConnectionNotice(Sender: TObject; Errors: TPgErrors);
begin
  if RunningTask=Nil then
    // do nothing, old, pending notices
  else
    RunningTask.Comm.Send(2,Errors[Errors.Count-1].ToString);
end;

我知道定义一个全局变量并不干净,尽管我知道最多只有一个后台任务。我可能应该将 IOmniTask 引用存储在 ThreadConnection 中,因为这就是 SenderPgConnectionNotice.

中的内容