如何知道 OmniThreadLibrary 中流水线阶段的状态?
How to know the state of Pipeline stages in OmniThreadLibrary?
显示了使用 Parallel.Pipeline 进行数据处理的示例。
目前我需要知道 Pipeline 何时启动以及其所有阶段何时完成。我阅读了其他 gabr 对这个问题的回答 How to monitor Pipeline stages in OmniThreadLibrary?. I tried to do it like this (modified according to the ):
unit Unit1;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, superobject,
OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;
const
WM_STARTED = WM_USER;
WM_ENDED = WM_USER + 1;
type
TForm1 = class(TForm)
btnStart: TButton;
btnStop: TButton;
lbLog: TListBox;
procedure btnStartClick(Sender: TObject);
procedure btnStopClick(Sender: TObject);
private
FCounterTotal: IOmniCounter;
FCounterProcessed: IOmniCounter;
FIsBusy: boolean;
FPipeline: IOmniPipeline;
procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
strict protected
procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
end;
var
Form1: TForm1;
procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';
implementation
uses IOUtils;
{$R *.dfm}
procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
i, cnt: integer;
f: string;
begin
while not input.IsCompleted do begin
task.Comm.Send(WM_STARTED); // message is sent once every 1 min
cnt := 0;
for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
begin
output.TryAdd(f);
Inc(cnt);
Sleep(1000); // simulate a work
end;
FCounterTotal.Value := cnt;
// I need to continously check a specified folder for new files, with
// a period of 1 minute (60 sec) for an unlimited period of time.
i := 60;
repeat
Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
if input.IsCompleted then Break;
dec(i);
until i < 0;
end;
end;
procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
sl: TStringList;
ws: WideString;
begin
sl := TStringList.Create;
try
sl.LoadFromFile(input.AsString);
GetJSON_(PChar(sl.Text), ws); // output as ISuperObject --- DLL procedure
output := SO(ws);
// TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
finally
sl.Free;
end;
end;
procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
value: TOmniValue;
JSON: ISuperObject;
cnt: integer;
begin
for value in input do begin
JSON := value.AsInterface as ISuperObject;
// do something with JSON
cnt := FCounterProcessed.Increment;
if FCounterTotal.Value = cnt then
task.Comm.Send(WM_ENDED); // !!! message is not sent
end;
end;
//
procedure TForm1.btnStartClick(Sender: TObject);
begin
btnStart.Enabled := False;
FCounterTotal := CreateCounter(-1);
FCounterProcessed := CreateCounter(0);
FPipeline := Parallel.Pipeline
.Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
.Stage(Async_Parse)
.Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
.Run;
end;
procedure TForm1.btnStopClick(Sender: TObject);
begin
if Assigned(FPipeline) then begin
FPipeline.Input.CompleteAdding;
FPipeline := nil;
end;
btnStart.Enabled := True;
end;
//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
FIsBusy := False;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;
procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
FIsBusy := True;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;
end.
task.Comm.Send(WM_STARTED)
一切正常,但 task.Comm.Send(WM_ENDED)
行永远不会执行。我怎么知道最后一个阶段何时完成?正确的做法是什么?
你的方法(我最初提出的)有一个竞争条件,阻止它工作。 (抱歉,这是我最初设计中的一个缺陷。)
基本上,会发生什么:
- Async_Files 将最后一个文件发送到管道。
- Async_Files块(模拟一些工作量)。
- Async_JSON 接收并处理最后一个文件。
- Async_Files 现在设置 FCounterTotal 计数器。
在那一刻,Async_JSON 已经在等待下一个数据,它永远不会到来,并且不再检查 FCounterTotal 值。
另一种方法是将特殊的 sentinel
值作为最后一项发送到管道中。
异常也可以作为哨兵。如果您在第一阶段引发异常,它将 'flow' 通过管道到达您可以处理它的结尾。无需在任何特定阶段完成任何特殊工作 - 默认情况下,一个阶段只会重新引发异常。
感谢 gabr,他的建议使用特殊的 sentinel
值帮助我找到了解决问题的方法。此代码按预期工作:
unit Unit1;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, superobject,
OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;
const
WM_STARTED = WM_USER;
WM_ENDED = WM_USER + 1;
type
TForm1 = class(TForm)
btnStart: TButton;
btnStop: TButton;
lbLog: TListBox;
procedure btnStartClick(Sender: TObject);
procedure btnStopClick(Sender: TObject);
private
FIsBusy: boolean;
FPipeline: IOmniPipeline;
procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
strict protected
procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
end;
var
Form1: TForm1;
procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';
implementation
uses IOUtils;
{$R *.dfm}
procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
i: integer;
f: string;
begin
while not input.IsCompleted do begin
task.Comm.Send(WM_STARTED); // message is sent once every 1 min
for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
begin
output.TryAdd(f);
Sleep(1000); // simulate a work
end;
output.TryAdd(0); // to send a special 'sentinel' value
// I need to continously check a specified folder for new files, with
// a period of 1 minute (60 sec) for an unlimited period of time.
i := 60;
repeat
Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
if input.IsCompleted then Break;
dec(i);
until i < 0;
end;
end;
procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
sl: TStringList;
ws: WideString;
begin
if input.IsInteger and (input.AsInteger = 0) then begin
output := 0; // if we got 'sentinel' value send it to the next stage
Exit;
end;
sl := TStringList.Create;
try
sl.LoadFromFile(input.AsString);
GetJSON_(PChar(sl.Text), ws); // output as ISuperObject --- DLL procedure
output := SO(ws);
// TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
finally
sl.Free;
end;
end;
procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
value: TOmniValue;
JSON: ISuperObject;
begin
for value in input do begin
if value.IsInteger and (value.AsInteger = 0) then begin
task.Comm.Send(WM_ENDED); // if we got 'sentinel' value
Continue;
end;
JSON := value.AsInterface as ISuperObject;
// do something with JSON
end;
end;
//
procedure TForm1.btnStartClick(Sender: TObject);
begin
btnStart.Enabled := False;
FPipeline := Parallel.Pipeline
.Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
.Stage(Async_Parse)
.Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
.Run;
end;
procedure TForm1.btnStopClick(Sender: TObject);
begin
if Assigned(FPipeline) then begin
FPipeline.Input.CompleteAdding;
FPipeline := nil;
end;
btnStart.Enabled := True;
end;
//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
FIsBusy := False;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;
procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
FIsBusy := True;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;
end.
使用 Exception 作为哨兵的替代方案(尚未奏效,但我可能做错了什么):
unit Unit1;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, superobject,
OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;
const
WM_STARTED = WM_USER;
WM_ENDED = WM_USER + 1;
type
ESentinelException = class(Exception);
TForm1 = class(TForm)
btnStart: TButton;
btnStop: TButton;
lbLog: TListBox;
procedure btnStartClick(Sender: TObject);
procedure btnStopClick(Sender: TObject);
private
FIsBusy: boolean;
FPipeline: IOmniPipeline;
procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
strict protected
procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
end;
var
Form1: TForm1;
procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';
implementation
uses IOUtils;
{$R *.dfm}
procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
i: integer;
f: string;
begin
while not input.IsCompleted do begin
task.Comm.Send(WM_STARTED); // message is sent once every 1 min
for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
begin
output.TryAdd(f);
Sleep(1000); // simulate a work
end;
raise ESentinelException.Create('sentinel');
// I need to continously check a specified folder for new files, with
// a period of 1 minute (60 sec) for an unlimited period of time.
i := 60;
repeat
Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
if input.IsCompleted then Break;
dec(i);
until i < 0;
end;
end;
procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
sl: TStringList;
ws: WideString;
begin
sl := TStringList.Create;
try
sl.LoadFromFile(input.AsString);
GetJSON_(PChar(sl.Text), ws); // output as ISuperObject --- DLL procedure
output := SO(ws);
// TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
finally
sl.Free;
end;
end;
procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
value: TOmniValue;
JSON: ISuperObject;
begin
for value in input do begin
if value.IsException and (value.AsException is ESentinelException) then begin
task.Comm.Send(WM_ENDED); // if we got 'sentinel' Exception
value.AsException.Free;
end
else begin
JSON := value.AsInterface as ISuperObject;
// do something with JSON
end;
end;
end;
//
procedure TForm1.btnStartClick(Sender: TObject);
begin
btnStart.Enabled := False;
FPipeline := Parallel.Pipeline
.Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
.Stage(Async_Parse)
.Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
.HandleExceptions
.Run;
end;
procedure TForm1.btnStopClick(Sender: TObject);
begin
if Assigned(FPipeline) then begin
FPipeline.Input.CompleteAdding;
FPipeline := nil;
end;
btnStart.Enabled := True;
end;
//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
FIsBusy := False;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;
procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
FIsBusy := True;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;
end.
目前我需要知道 Pipeline 何时启动以及其所有阶段何时完成。我阅读了其他 gabr 对这个问题的回答 How to monitor Pipeline stages in OmniThreadLibrary?. I tried to do it like this (modified according to the
unit Unit1;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, superobject,
OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;
const
WM_STARTED = WM_USER;
WM_ENDED = WM_USER + 1;
type
TForm1 = class(TForm)
btnStart: TButton;
btnStop: TButton;
lbLog: TListBox;
procedure btnStartClick(Sender: TObject);
procedure btnStopClick(Sender: TObject);
private
FCounterTotal: IOmniCounter;
FCounterProcessed: IOmniCounter;
FIsBusy: boolean;
FPipeline: IOmniPipeline;
procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
strict protected
procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
end;
var
Form1: TForm1;
procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';
implementation
uses IOUtils;
{$R *.dfm}
procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
i, cnt: integer;
f: string;
begin
while not input.IsCompleted do begin
task.Comm.Send(WM_STARTED); // message is sent once every 1 min
cnt := 0;
for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
begin
output.TryAdd(f);
Inc(cnt);
Sleep(1000); // simulate a work
end;
FCounterTotal.Value := cnt;
// I need to continously check a specified folder for new files, with
// a period of 1 minute (60 sec) for an unlimited period of time.
i := 60;
repeat
Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
if input.IsCompleted then Break;
dec(i);
until i < 0;
end;
end;
procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
sl: TStringList;
ws: WideString;
begin
sl := TStringList.Create;
try
sl.LoadFromFile(input.AsString);
GetJSON_(PChar(sl.Text), ws); // output as ISuperObject --- DLL procedure
output := SO(ws);
// TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
finally
sl.Free;
end;
end;
procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
value: TOmniValue;
JSON: ISuperObject;
cnt: integer;
begin
for value in input do begin
JSON := value.AsInterface as ISuperObject;
// do something with JSON
cnt := FCounterProcessed.Increment;
if FCounterTotal.Value = cnt then
task.Comm.Send(WM_ENDED); // !!! message is not sent
end;
end;
//
procedure TForm1.btnStartClick(Sender: TObject);
begin
btnStart.Enabled := False;
FCounterTotal := CreateCounter(-1);
FCounterProcessed := CreateCounter(0);
FPipeline := Parallel.Pipeline
.Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
.Stage(Async_Parse)
.Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
.Run;
end;
procedure TForm1.btnStopClick(Sender: TObject);
begin
if Assigned(FPipeline) then begin
FPipeline.Input.CompleteAdding;
FPipeline := nil;
end;
btnStart.Enabled := True;
end;
//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
FIsBusy := False;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;
procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
FIsBusy := True;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;
end.
task.Comm.Send(WM_STARTED)
一切正常,但 task.Comm.Send(WM_ENDED)
行永远不会执行。我怎么知道最后一个阶段何时完成?正确的做法是什么?
你的方法(我最初提出的)有一个竞争条件,阻止它工作。 (抱歉,这是我最初设计中的一个缺陷。)
基本上,会发生什么:
- Async_Files 将最后一个文件发送到管道。
- Async_Files块(模拟一些工作量)。
- Async_JSON 接收并处理最后一个文件。
- Async_Files 现在设置 FCounterTotal 计数器。
在那一刻,Async_JSON 已经在等待下一个数据,它永远不会到来,并且不再检查 FCounterTotal 值。
另一种方法是将特殊的 sentinel
值作为最后一项发送到管道中。
异常也可以作为哨兵。如果您在第一阶段引发异常,它将 'flow' 通过管道到达您可以处理它的结尾。无需在任何特定阶段完成任何特殊工作 - 默认情况下,一个阶段只会重新引发异常。
感谢 gabr,他的建议使用特殊的 sentinel
值帮助我找到了解决问题的方法。此代码按预期工作:
unit Unit1;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, superobject,
OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;
const
WM_STARTED = WM_USER;
WM_ENDED = WM_USER + 1;
type
TForm1 = class(TForm)
btnStart: TButton;
btnStop: TButton;
lbLog: TListBox;
procedure btnStartClick(Sender: TObject);
procedure btnStopClick(Sender: TObject);
private
FIsBusy: boolean;
FPipeline: IOmniPipeline;
procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
strict protected
procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
end;
var
Form1: TForm1;
procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';
implementation
uses IOUtils;
{$R *.dfm}
procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
i: integer;
f: string;
begin
while not input.IsCompleted do begin
task.Comm.Send(WM_STARTED); // message is sent once every 1 min
for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
begin
output.TryAdd(f);
Sleep(1000); // simulate a work
end;
output.TryAdd(0); // to send a special 'sentinel' value
// I need to continously check a specified folder for new files, with
// a period of 1 minute (60 sec) for an unlimited period of time.
i := 60;
repeat
Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
if input.IsCompleted then Break;
dec(i);
until i < 0;
end;
end;
procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
sl: TStringList;
ws: WideString;
begin
if input.IsInteger and (input.AsInteger = 0) then begin
output := 0; // if we got 'sentinel' value send it to the next stage
Exit;
end;
sl := TStringList.Create;
try
sl.LoadFromFile(input.AsString);
GetJSON_(PChar(sl.Text), ws); // output as ISuperObject --- DLL procedure
output := SO(ws);
// TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
finally
sl.Free;
end;
end;
procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
value: TOmniValue;
JSON: ISuperObject;
begin
for value in input do begin
if value.IsInteger and (value.AsInteger = 0) then begin
task.Comm.Send(WM_ENDED); // if we got 'sentinel' value
Continue;
end;
JSON := value.AsInterface as ISuperObject;
// do something with JSON
end;
end;
//
procedure TForm1.btnStartClick(Sender: TObject);
begin
btnStart.Enabled := False;
FPipeline := Parallel.Pipeline
.Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
.Stage(Async_Parse)
.Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
.Run;
end;
procedure TForm1.btnStopClick(Sender: TObject);
begin
if Assigned(FPipeline) then begin
FPipeline.Input.CompleteAdding;
FPipeline := nil;
end;
btnStart.Enabled := True;
end;
//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
FIsBusy := False;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;
procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
FIsBusy := True;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;
end.
使用 Exception 作为哨兵的替代方案(尚未奏效,但我可能做错了什么):
unit Unit1;
interface
uses
Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
Dialogs, StdCtrls, superobject,
OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;
const
WM_STARTED = WM_USER;
WM_ENDED = WM_USER + 1;
type
ESentinelException = class(Exception);
TForm1 = class(TForm)
btnStart: TButton;
btnStop: TButton;
lbLog: TListBox;
procedure btnStartClick(Sender: TObject);
procedure btnStopClick(Sender: TObject);
private
FIsBusy: boolean;
FPipeline: IOmniPipeline;
procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
strict protected
procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
end;
var
Form1: TForm1;
procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';
implementation
uses IOUtils;
{$R *.dfm}
procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
i: integer;
f: string;
begin
while not input.IsCompleted do begin
task.Comm.Send(WM_STARTED); // message is sent once every 1 min
for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
begin
output.TryAdd(f);
Sleep(1000); // simulate a work
end;
raise ESentinelException.Create('sentinel');
// I need to continously check a specified folder for new files, with
// a period of 1 minute (60 sec) for an unlimited period of time.
i := 60;
repeat
Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
if input.IsCompleted then Break;
dec(i);
until i < 0;
end;
end;
procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
sl: TStringList;
ws: WideString;
begin
sl := TStringList.Create;
try
sl.LoadFromFile(input.AsString);
GetJSON_(PChar(sl.Text), ws); // output as ISuperObject --- DLL procedure
output := SO(ws);
// TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
finally
sl.Free;
end;
end;
procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
value: TOmniValue;
JSON: ISuperObject;
begin
for value in input do begin
if value.IsException and (value.AsException is ESentinelException) then begin
task.Comm.Send(WM_ENDED); // if we got 'sentinel' Exception
value.AsException.Free;
end
else begin
JSON := value.AsInterface as ISuperObject;
// do something with JSON
end;
end;
end;
//
procedure TForm1.btnStartClick(Sender: TObject);
begin
btnStart.Enabled := False;
FPipeline := Parallel.Pipeline
.Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
.Stage(Async_Parse)
.Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
.HandleExceptions
.Run;
end;
procedure TForm1.btnStopClick(Sender: TObject);
begin
if Assigned(FPipeline) then begin
FPipeline.Input.CompleteAdding;
FPipeline := nil;
end;
btnStart.Enabled := True;
end;
//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
FIsBusy := False;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;
procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
FIsBusy := True;
lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;
end.