使用 IOmniPipeline 下载和处理文件

Download and process files with IOmniPipeline

我的目标是一个 VCL 应用程序,我需要同时下载一些文件(存储在 TDataSet 中的 URL),然后必须处理每个下载的文件(一个接一个)。在应用程序工作期间 GUI 不得挂起,用户用户应该能够在任何阶段取消(中断)进程。

我完全重写了我的第一个示例,现在没有第三方 classes(仅限 Omni Thread Library 3.07.6 和 VCL)。当然,它仍然是演示,并且从代码中删除了一些检查,但是不幸的是,这个示例仍然不短。 下载部分基于this answer(谢谢!)。

因此,当任何文件下载时,我需要在 GUI 中显示该文件的进度。下载器 class 生成 "event" OnProgressChange(因为在实际应用中我将使用 Alcinoe 库中的 TALWinInetHTTPClient class 实例并且它具有真实事件 OnProgressChange)。我认为在数据库中写入进度值就足够了,然后 DBGrid 正确显示进度值。

当然我看到了这个答案(以及其他一些与 OTL 相关的答案): (和我的任务差不多,细节上有区别)

我也看到了 OTL 文档和 examples 但我找不到一些执行类似任务的真实示例。

我创建了一些 classes 来解决这个任务,它可以工作,但有一些严重的问题:

  1. 第一阶段下载不是分开开始的(不是并行,而是一个接一个)。

  2. 无法正常取消。

下面是一些代码来说明我的问题。它包含两个单元,一个是主窗体(GUI、准备数据、与用户交互),第二个是管道包装器和下载器。

主窗体的 DFM 是:

object fmMain: TfmMain
  Left = 628
  Top = 172
  Caption = 'WorkSpace preparer'
  ClientHeight = 262
  ClientWidth = 700
  Color = clBtnFace
  Font.Charset = DEFAULT_CHARSET
  Font.Color = clWindowText
  Font.Height = -13
  Font.Name = 'Segoe UI'
  Font.Style = []
  OldCreateOrder = False
  Position = poScreenCenter
  OnCloseQuery = FormCloseQuery
  OnCreate = FormCreate
  PixelsPerInch = 96
  TextHeight = 17
  object DBGridApps: TDBGrid
    AlignWithMargins = True
    Left = 3
    Top = 3
    Width = 694
    Height = 207
    Align = alClient
    DataSource = dsApps
    ReadOnly = True
    TabOrder = 0
    TitleFont.Charset = DEFAULT_CHARSET
    TitleFont.Color = clWindowText
    TitleFont.Height = -13
    TitleFont.Name = 'Segoe UI'
    TitleFont.Style = []
  end
  object Panel1: TPanel
    AlignWithMargins = True
    Left = 3
    Top = 216
    Width = 694
    Height = 43
    Align = alBottom
    TabOrder = 1
    object bbExit: TBitBtn
      AlignWithMargins = True
      Left = 549
      Top = 4
      Width = 141
      Height = 35
      Align = alRight
      Caption = 'Exit'
      TabOrder = 0
      OnClick = bbExitClick
    end
    object bbCancel: TBitBtn
      AlignWithMargins = True
      Left = 151
      Top = 4
      Width = 141
      Height = 35
      Align = alLeft
      Caption = 'Cancel'
      TabOrder = 1
      OnClick = bbCancelClick
      ExplicitTop = 0
    end
    object bbStart: TBitBtn
      AlignWithMargins = True
      Left = 4
      Top = 4
      Width = 141
      Height = 35
      Align = alLeft
      Caption = 'Start'
      TabOrder = 2
      OnClick = bbStartClick
    end
  end
  object dsApps: TDataSource
    DataSet = cdsApps
    Left = 32
    Top = 88
  end
  object cdsApps: TClientDataSet
    Aggregates = <>
    Params = <>
    Left = 16
    Top = 72
  end
end

主窗体代码:

unit MainForm;

interface

uses
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
  Dialogs, DB, DBClient, Grids, DBGrids, StdCtrls, Buttons, ExtCtrls,
  PipelineHolder;

type
  TfmMain = class(TForm)
    DBGridApps: TDBGrid;
    dsApps: TDataSource;
    Panel1: TPanel;
    bbExit: TBitBtn;
    bbCancel: TBitBtn;
    bbStart: TBitBtn;
    cdsApps: TClientDataSet;
    procedure bbExitClick(Sender: TObject);
    procedure bbCancelClick(Sender: TObject);
    procedure bbStartClick(Sender: TObject);
    procedure FormCloseQuery(Sender: TObject; var CanClose: Boolean);
    procedure FormCreate(Sender: TObject);
  private
    { Private declarations }
    PH : TPipelineHolder;
    procedure SwitchControlState;
  public
    { Public declarations }
  end;

var
  fmMain: TfmMain;

implementation

{$R *.dfm}

procedure TfmMain.bbExitClick(Sender: TObject);
begin
  Close;
end;

procedure TfmMain.bbCancelClick(Sender: TObject);
begin
  if Assigned(PH) then
    begin
      SwitchControlState;
      PH.Stop;
    end;
end;

procedure TfmMain.bbStartClick(Sender: TObject);
begin
  if not Assigned(PH) then
    PH := TPipelineHolder.Create;
  SwitchControlState;
  PH.Make(cdsApps);
end;

procedure TfmMain.FormCloseQuery(Sender: TObject; var CanClose: Boolean);
begin
  CanClose := MessageBox(0, 'Exit now?', 'Exit', MB_YESNO + MB_ICONQUESTION + MB_DEFBUTTON2 + MB_TOPMOST) = IDYES;
  if CanClose then bbCancel.Click;
end;

procedure TfmMain.FormCreate(Sender: TObject);
begin
  //Prepare dataset
  cdsApps.Close;
  With cdsApps do
    begin
      FieldDefs.Add('progress', ftFloat);
      FieldDefs.Add('status', ftString, 30);
      FieldDefs.Add('id', ftString, 30);
      FieldDefs.Add('uid', ftString, 30);
      FieldDefs.Add('title', ftString, 30);
      FieldDefs.Add('url', ftString, 255);
      FieldDefs.Add('silent_parameters', ftString, 255);
      FieldDefs.Add('target_file', ftString, 255);
      CreateDataSet;
      LogChanges := False;
      Open;

      // Below you can change URL as you wish.
      // For example I'll use VirtualBox distrib from this page: https://www.virtualbox.org/wiki/Downloads
      // To correct progress values web-server must response with correct content-lenght values and must
      // support HEAD command.
      // Record 1
      Append;
      Fields[0].AsFloat := 0.0;
      Fields[1].AsString := 'Ready';
      Fields[2].AsString := '5be2e746ce46a1000cdc8b90';
      Fields[3].AsString := 'SomeApp1';
      Fields[4].AsString := 'VirtualBox 6.0.10';
      Fields[5].AsString := 'https://download.virtualbox.org/virtualbox/6.0.10/VirtualBox-6.0.10-132072-Win.exe';
      Fields[6].AsString := '/S';
      Fields[7].AsString := '001_installer.exe';
      Post;
      // Record 2
      Append;
      Fields[0].AsFloat := 0.0;
      Fields[1].AsString := 'Ready';
      Fields[2].AsString := '5be31c63ce46a1000b268bb2';
      Fields[3].AsString := 'SomeApp2';
      Fields[4].AsString := 'VirtualBox 6.0.10';
      Fields[5].AsString := 'https://download.virtualbox.org/virtualbox/6.0.10/VirtualBox-6.0.10-132072-Win.exe';
      Fields[6].AsString := '';
      Fields[7].AsString := '002_installer.exe';
      Post;
      // Record 3
      Append;
      Fields[0].AsFloat := 0.0;
      Fields[1].AsString := 'Ready';
      Fields[2].AsString := '5be3428ace46a1000b268bc0';
      Fields[3].AsString := 'SomeApp3';
      Fields[4].AsString := 'VirtualBox 6.0.10';
      Fields[5].AsString := 'https://download.virtualbox.org/virtualbox/6.0.10/VirtualBox-6.0.10-132072-Win.exe';
      Fields[6].AsString := '/VERY_SILENT';
      Fields[7].AsString := '003_installer.exe';
      Post;
      // Record 4
      Append;
      Fields[0].AsFloat := 0.0;
      Fields[1].AsString := 'Ready';
      Fields[2].AsString := '5be3428ace46a1000b268bc1';
      Fields[3].AsString := 'SomeApp4';
      Fields[4].AsString := 'VirtualBox 6.0.10';
      Fields[5].AsString := 'https://download.virtualbox.org/virtualbox/6.0.10/VirtualBox-6.0.10-132072-Win.exe';
      Fields[6].AsString := '';
      Fields[7].AsString := '004_installer.exe';
      Post;
      // Record 5 - it's not exe, just simple testing file, in this demo at
      // Install method with this file will set status to error.
      Append;
      Fields[0].AsFloat := 0.0;
      Fields[1].AsString := 'Ready';
      Fields[2].AsString := '5be512bb4a9bbb000b6de944';
      Fields[3].AsString := 'SomeFile';
      Fields[4].AsString := 'Demo File (not executable)';
      Fields[5].AsString := 'https://speed.hetzner.de/100MB.bin';
      Fields[6].AsString := '';
      Fields[7].AsString := '005_sample_100MB.bin';
      Post;

      First;
    end;
end;

procedure TfmMain.SwitchControlState;
begin
  bbStart.Enabled := not bbStart.Enabled;
end;

end.

管道工作实施的第二单元:

unit PipelineHolder;

interface

uses
  Windows, SysUtils, Classes, OtlCommon, OtlCollections, OtlParallel, Forms,
  DB, Generics.Defaults, StrUtils, Generics.Collections, Messages, OtlComm,
  OtlTask, OtlTaskControl, ShellAPI, Dialogs, OtlSync, Math, WinInet;

// Messages
const
  WM_PROGRESSCHANGED = WM_APP + 105;

// Process states
type
  TAppState  = (asReady = 0, asCancelled = 1, asError = 2, asDownloading = 3, asDownloaded = 4, asInstalling = 5, asCompleted = 6);
  TAppStateNames = array[asReady..asCompleted] of string;
const
  AppState: TAppStateNames = ('Ready', 'Canceled', 'Error', 'Downloading', 'Downloaded', 'Installing', 'Installed');

type
  // Data structs for progress message
  PProgressInfo = ^TProgressInfo;
  TProgressInfo = record
    Read  : Int64;
    Total : Int64;
    ID    : string;
    URL   : string;
  end;

  //Structure for record info
  TRecordInfo = record
    Filename: string;
    URL: string;
    ID: string;
    Cmd : string;
  end;

  // Class for downloading
  TDBAppItem = class
  private
    FHandle : HWND;
    FDS : TDataSet;
    FFilename: string;
    FURL: string;
    FId: string;
    FCmd : string;
    FFileSize : Int64;
    FDownloaded : Int64;
    function GetWinInetError(ErrorCode: Cardinal): string;
    procedure ParseURL(const lpszUrl: string; var Host, Resource: string);
    function GetRemoteFileSize(const Url : string): Integer;
    function DownloadFile(const url: string; const TargetFileName: string): boolean;
    procedure InternalDownloadProgress(Sender: TObject; Read: Integer; Total: Integer);
  public
    constructor Create(const OwnerHandle: HWND; var DS: TDataSet; const URL, ID: string; const Cmd: string; const TargetFilename: string);
    destructor Destroy; override;
    function Download : Boolean; overload;
  end;


  // Main class, pipeline holder
  TPipelineHolder = class
  private
    FDS : TDataSet;
    FHandle : HWND;
    FPipeline : IOmniPipeline;
    FInProcess: Boolean;
    procedure Retrieve(const input: TOmniValue; var output: TOmniValue);
    procedure RetrieveAll(const input, output: IOmniBlockingCollection);
    procedure Install(const input, output: IOmniBlockingCollection);
    procedure JobDone;
    procedure WndProc(var Message: TMessage);
    procedure WMProgressChanged(var msg: TMessage); message WM_PROGRESSCHANGED;
  public
    constructor Create;
    destructor Destroy; override;

    procedure Make(SourceDS : TDataSet);
    function Stop: Boolean;
    property InProcess: Boolean read FInProcess write FInProcess;
  end;


implementation

{ Tools }
function RunAsAdmin(const Handle: HWnd; const Filename, Params: string): Boolean;
var
  sei: TShellExecuteInfo;
begin
  FillChar(sei, SizeOf(sei), 0);
  sei.cbSize := SizeOf(sei);
  sei.Wnd := Handle;
  sei.fMask := SEE_MASK_FLAG_DDEWAIT or SEE_MASK_FLAG_NO_UI;
  sei.lpVerb := 'runas';
  sei.lpFile := PChar(Filename);
  sei.lpParameters := PChar(Params);
  sei.nShow := SW_SHOWNORMAL;
  Result := ShellExecuteEx(@sei);
end;

{TPipelineHolder}

constructor TPipelineHolder.Create;
begin
  inherited Create;
  FHandle := AllocateHWnd(WndProc);
  FInProcess := False;
end;

destructor TPipelineHolder.Destroy;
begin
  if FInProcess then
    if Assigned(FPipeline) then
      begin
        FPipeline.Cancel;
        FPipeline := nil;
        FInProcess := False;
      end;

  if FHandle <> 0 then DeallocateHWnd(FHandle);

  inherited;
end;

procedure TPipelineHolder.Install(const input, output: IOmniBlockingCollection);
var
  app      : TOmniValue;
  appFile  : string;
  appParams: string;
  ID       : string;
  State    : string;
  AppInfo  : TRecordInfo;
begin
  // In real app here is downloaded file must be started as separate process and
  // we must wait when it will be completed.
  for app in input do
    begin
      if not app.IsEmpty then
        begin
          AppInfo := app.ToRecord<TRecordInfo>;
          appFile := AppInfo.Filename;
          appParams := AppInfo.Cmd;
          ID := AppInfo.ID;
          if (appFile <> EmptyStr) and (FileExists(appFile)) then
            begin
              // Change file state
              FDS.DisableControls;
              try
                if FDS.Locate('id', ID, [loCaseInsensitive]) then
                  begin
                    FDS.Edit;
                    FDS.FieldByName('Status').AsString := AppState[asInstalling];
                    FDS.Post;
                  end;
              finally
                FDS.EnableControls;
              end;

              // Try to execute intsaller
              if RunAsAdmin(Application.Handle, appFile, appParams) then
                begin
                  State := AppState[asCompleted]
                end
              else
                begin
                  State := AppState[asError];
                end;

              // Change state again
              FDS.DisableControls;
              try
                if FDS.Locate('id', ID, [loCaseInsensitive]) then
                  begin
                    FDS.Edit;
                    FDS.FieldByName('Status').AsString := State;
                    FDS.Post;
                  end;
              finally
                FDS.EnableControls;
              end;
            end;
        end;
    end;
end;

procedure TPipelineHolder.Retrieve(const input: TOmniValue; var output: TOmniValue);
var
  App: TDBAppItem;
  Info : TRecordInfo;
begin
  // Checking cancellation flag
  if not FInProcess then Exit;

  // Preparing
  Info := input.ToRecord<TRecordInfo>;
  App := TDBAppItem.Create(FHandle, FDS, Info.URL, Info.ID, Info.Cmd,  Info.Filename);

  // Downloading
  try
    if App.Download then
      output := TOmniValue.FromRecord<TRecordInfo>(Info)
    else
      output.Clear;
  finally
    FreeAndNil(App);
  end;
end;

procedure TPipelineHolder.RetrieveAll(const input,
  output: IOmniBlockingCollection);
var
  App: TDBAppItem;
  Info : TRecordInfo;
  value : TOmniValue;
begin
  // Preparing
  for value in input do
    begin
      if not FInProcess then Exit;

      Info := value.ToRecord<TRecordInfo>;
      App := TDBAppItem.Create(FHandle, FDS, Info.URL, Info.ID, Info.Cmd, Info.Filename);

      // Downloading
      try
        if App.Download then
          output.Add(TOmniValue.FromRecord<TRecordInfo>(Info));
      finally
        FreeAndNil(App);
      end;
    end;
end;

function TPipelineHolder.Stop: Boolean;
begin
  if FInProcess then
    begin
      if Assigned(FPipeline) then
        begin
          FPipeline.Cancel;
          FPipeline := nil;
          FInProcess := False;
        end;
    end;
  Result := not FInProcess;
end;

procedure TPipelineHolder.WMProgressChanged(var msg: TMessage);
var
  MsgRec  : TProgressInfo;
  Percent, Current : Double;
  Read, Total : Int64;
  ID : string;
begin
  MsgRec := PProgressInfo(Msg.LParam )^;

  Read := MsgRec.Read;
  Total := MsgRec.Total;
  Percent := 100 * Read / Total;
  ID := MsgRec.ID;
  // Write data to db
  if FDS.Locate('id', ID, [loCaseInsensitive]) then
    begin
      FDS.DisableControls;
      try
        Current := FDS.FieldByName('Progress').AsFloat;
        if Current <> Trunc(Percent) then
          begin
            FDS.Edit;
            FDS.FieldByName('Progress').AsFloat := Round(Percent);
            if Percent >= 99 then
              begin
                FDS.FieldByName('Status').AsString := AppState[asDownloaded];
              end;
            FDS.Post;
          end;
      finally
        FDS.EnableControls;
      end;
    end;
end;

procedure TPipelineHolder.WndProc(var Message: TMessage);
begin
  Dispatch(Message);
  inherited;
end;

procedure TPipelineHolder.JobDone;
begin
  FPipeline := nil;
  FInProcess := False;
end;

procedure TPipelineHolder.Make(SourceDS: TDataSet);
var
  BM            : TBookmark;
  RecInfo       : TRecordInfo;
begin
  if SourceDS = nil then Exit;
  if not SourceDS.Active then Exit;
  if SourceDS.IsEmpty then Exit;

  FDS := SourceDS;
  FInProcess := True;

  // Here at first stage calling Retrive or RetrieveAll gives same effect, no
  // matter what we uses value or queue.
  FPipeline := Parallel.Pipeline
   .Stage(RetrieveAll, //Retrieve
 Parallel.TaskConfig.OnMessage(Self)).NumTasks(Environment.Process.Affinity.Count * 2)
   .Stage(Install)
   .OnStop(JobDone)
   .Run;

  // Get URLs to be downloaded
  BM := FDS.GetBookmark;
  FDS.DisableControls;
  try
    FDS.First;
    while not FDS.Eof do
      begin
        // Get data from database
        RecInfo.URL := Trim(FDS.FieldByName('url').AsString);
        RecInfo.Id := Trim(FDS.FieldByName('id').AsString);
        RecInfo.Cmd := Trim(FDS.FieldByName('silent_parameters').AsString);
        RecInfo.Filename := ExtractFilePath(ParamStr(0)) + 'Downloads\' + Trim(FDS.FieldByName('target_file').AsString);

        if RecInfo.URL = EmptyStr then
          begin
            // Skips empty URLs
            FDS.Next;
            Continue;
          end;
        FDS.Edit;
        FDS.FieldByName('Status').AsString := AppState[asDownloading];
        FDS.Post;

        FPipeline.Input.Add(TOmniValue.FromRecord<TRecordInfo>(RecInfo));
        FDS.Next;
      end;
  finally
    if FDS.BookmarkValid(BM) then SourceDS.GotoBookmark(BM);
    FDS.FreeBookmark(BM);
    FDS.EnableControls;
  end;

  FPipeline.Input.CompleteAdding;

  // Wait for pipeline to complete - I'm not use it to avoid GUI freezing
//  FPipeline.WaitFor(INFINITE);
end;

constructor TDBAppItem.Create(const OwnerHandle: HWND; var DS: TDataSet; const URL, ID, Cmd, TargetFilename: string);
begin
  inherited Create;
  FDS         := DS;
  FURL        := URL;
  FId         := ID;
  FCmd        := Cmd;
  FFilename   := TargetFilename;
  FHandle     := OwnerHandle;
  FFileSize   := -1;
  FDownloaded := 0;
end;

destructor TDBAppItem.Destroy;
begin
  FDS := nil;
  inherited;
end;

function TDBAppItem.Download: Boolean;
var
  path : string;
begin
  path := ExtractFilePath(FFilename);
  if not DirectoryExists(path) then
    if not ForceDirectories(path) then
      raise Exception.Create('Cannot create directory: "'+path+'".');

  if FileExists(FFilename) then
    try
      if not DeleteFile(FFilename) then
        raise Exception.Create('Cannot delete file: "'+FFilename+'".');
    except on E: Exception do
      raise Exception.Create('Cannot delete file: "'+FFilename+'".'+sLineBreak + E.Message);
    end;

  Result := DownloadFile(FURL, FFilename);
  if Result then Result := FileExists(FFilename);
end;

function TDBAppItem.DownloadFile(const url, TargetFileName: string): boolean;
var
  hInet: HINTERNET;
  hFile: HINTERNET;
  localFile: file;
  buffer: array[1..65535] of Byte;
  bytesRead: DWORD;
  b: boolean;
begin
  b := False;
  if FFileSize < 0 then FFileSize := GetRemoteFileSize(url);
  FDownloaded := 0;
  hInet := WinInet.InternetOpen('MyFileAgent', INTERNET_OPEN_TYPE_DIRECT, nil, nil, 0);
  if Assigned(hInet) then
    begin
      hFile := InternetOpenURL(hInet, PChar(url), nil, 0, INTERNET_FLAG_PRAGMA_NOCACHE, 0);
      if Assigned(hFile) then
        begin
          AssignFile(localFile, TargetFileName);
          Rewrite(localFile, 1);
          bytesRead := 0;
          repeat
            InternetReadFile(hFile, @buffer, SizeOf(buffer), bytesRead);
            BlockWrite(localFile, buffer, bytesRead);
            Inc(FDownloaded, bytesRead);
            //In real app this event occurs in TALWinHttpClient from Alcinoe library.
            InternalDownloadProgress(Self, FDownloaded, FFileSize);
          until bytesRead = 0;
          CloseFile(localFile);
          InternetCloseHandle(hFile);
        end;
      InternetCloseHandle(hInet);
      b := true;
    end;
  DownloadFile := b;
  FFileSize := -1;
  FDownloaded := 0;
end;

function TDBAppItem.GetRemoteFileSize(const Url: string): Integer;
const
  sUserAgent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36';
var
  hInet    : HINTERNET;
  hConnect : HINTERNET;
  hRequest : HINTERNET;
  lpdwBufferLength: DWORD;
  lpdwReserved    : DWORD;
  ServerName, Resource: string;
//  Prot, Host, User, Pass, Path, Extra: string;
  ErrorCode : Cardinal;
begin
  Result := -1;
  ParseURL(Url, ServerName, Resource);

  hInet := InternetOpen(PChar(sUserAgent), INTERNET_OPEN_TYPE_PRECONFIG, nil, nil, 0);
  if hInet=nil then
    begin
      ErrorCode:=GetLastError;
      raise Exception.Create(Format('InternetOpen Error %d Description %s',[ErrorCode,GetWinInetError(ErrorCode)]));
    end;

  try
    hConnect := InternetConnect(hInet, PChar(ServerName), INTERNET_DEFAULT_HTTP_PORT, nil, nil, INTERNET_SERVICE_HTTP, 0, 0);
    if hConnect=nil then
      begin
        ErrorCode:=GetLastError;
        raise Exception.Create(Format('InternetConnect Error %d Description %s',[ErrorCode,GetWinInetError(ErrorCode)]));
      end;

    try
      hRequest := HttpOpenRequest(hConnect, PChar('HEAD'), PChar(Resource), nil, nil, nil, 0, 0);
      if hRequest<>nil then
        begin
          try
            lpdwBufferLength := SizeOf(Result);
            lpdwReserved     := 0;
            if not HttpSendRequest(hRequest, nil, 0, nil, 0) then
              begin
                ErrorCode := GetLastError;
                raise Exception.Create(Format('HttpOpenRequest Error %d Description %s',[ErrorCode,GetWinInetError(ErrorCode)]));
              end;

            if not HttpQueryInfo(hRequest, HTTP_QUERY_CONTENT_LENGTH or HTTP_QUERY_FLAG_NUMBER, @Result, lpdwBufferLength, lpdwReserved) then
              begin
                Result := 0;
                ErrorCode := GetLastError;
                raise Exception.Create(Format('HttpQueryInfo Error %d Description %s',[ErrorCode,GetWinInetError(ErrorCode)]));
              end;
          finally
            InternetCloseHandle(hRequest);
          end;
        end
      else
        begin
          ErrorCode:=GetLastError;
          raise Exception.Create(Format('HttpOpenRequest Error %d Description %s',[ErrorCode,GetWinInetError(ErrorCode)]));
        end;
    finally
      InternetCloseHandle(hConnect);
    end;
  finally
    InternetCloseHandle(hInet);
  end;
end;

function TDBAppItem.GetWinInetError(ErrorCode: Cardinal): string;
const
  winetdll = 'wininet.dll';
var
  Len: Integer;
  Buffer: PChar;
begin
  Len := FormatMessage(
  FORMAT_MESSAGE_FROM_HMODULE or FORMAT_MESSAGE_FROM_SYSTEM or
  FORMAT_MESSAGE_ALLOCATE_BUFFER or FORMAT_MESSAGE_IGNORE_INSERTS or  FORMAT_MESSAGE_ARGUMENT_ARRAY,
  Pointer(GetModuleHandle(winetdll)), ErrorCode, 0, @Buffer, SizeOf(Buffer), nil);
  try
    while (Len > 0) and (CharInSet(Buffer[Len - 1], [#0..#32, '.'])) do Dec(Len);
    SetString(Result, Buffer, Len);
  finally
    LocalFree(HLOCAL(Buffer));
  end;
end;

procedure TDBAppItem.ParseURL(const lpszUrl: string; var Host,
  Resource: string);
var
  lpszScheme      : array[0..INTERNET_MAX_SCHEME_LENGTH - 1] of Char;
  lpszHostName    : array[0..INTERNET_MAX_HOST_NAME_LENGTH - 1] of Char;
  lpszUserName    : array[0..INTERNET_MAX_USER_NAME_LENGTH - 1] of Char;
  lpszPassword    : array[0..INTERNET_MAX_PASSWORD_LENGTH - 1] of Char;
  lpszUrlPath     : array[0..INTERNET_MAX_PATH_LENGTH - 1] of Char;
  lpszExtraInfo   : array[0..1024 - 1] of Char;
  lpUrlComponents : TURLComponents;
begin
  ZeroMemory(@lpszScheme, SizeOf(lpszScheme));
  ZeroMemory(@lpszHostName, SizeOf(lpszHostName));
  ZeroMemory(@lpszUserName, SizeOf(lpszUserName));
  ZeroMemory(@lpszPassword, SizeOf(lpszPassword));
  ZeroMemory(@lpszUrlPath, SizeOf(lpszUrlPath));
  ZeroMemory(@lpszExtraInfo, SizeOf(lpszExtraInfo));
  ZeroMemory(@lpUrlComponents, SizeOf(TURLComponents));

  lpUrlComponents.dwStructSize      := SizeOf(TURLComponents);
  lpUrlComponents.lpszScheme        := lpszScheme;
  lpUrlComponents.dwSchemeLength    := SizeOf(lpszScheme);
  lpUrlComponents.lpszHostName      := lpszHostName;
  lpUrlComponents.dwHostNameLength  := SizeOf(lpszHostName);
  lpUrlComponents.lpszUserName      := lpszUserName;
  lpUrlComponents.dwUserNameLength  := SizeOf(lpszUserName);
  lpUrlComponents.lpszPassword      := lpszPassword;
  lpUrlComponents.dwPasswordLength  := SizeOf(lpszPassword);
  lpUrlComponents.lpszUrlPath       := lpszUrlPath;
  lpUrlComponents.dwUrlPathLength   := SizeOf(lpszUrlPath);
  lpUrlComponents.lpszExtraInfo     := lpszExtraInfo;
  lpUrlComponents.dwExtraInfoLength := SizeOf(lpszExtraInfo);

  InternetCrackUrl(PChar(lpszUrl), Length(lpszUrl), ICU_DECODE or ICU_ESCAPE, lpUrlComponents);

  Host := lpszHostName;
  Resource := lpszUrlPath;
end;

procedure TDBAppItem.InternalDownloadProgress(Sender: TObject; Read,
  Total: Integer);
var
  MsgRec : PProgressInfo;
begin
  // Create progress changed message
  New(MsgRec);
  MsgRec^.ID := Fid;
  MsgRec^.Read := Read;
  MsgRec^.Total := Total;
  MsgRec^.URL := FURL;

  SendMessage(FHandle, WM_PROGRESSCHANGED, 0, LongInt(MsgRec));
end;

end.

我的基本想法是创建包含 2 个阶段的管道:

  1. 检索:同时下载所有文件(线程数受 OTL 的 NumTasks 限制)。

  2. 安装:只要下载了任何文件,就必须在此阶段进行处理。这个阶段的动作必须是一个一个的,即同一时间只能一个动作(在真正的应用程序中我不会同时启动很多安装程序)。

我试图了解 OTL 在这里是如何工作的,但我对这个库的经验还不多。

那么,亲爱的社区,我必须如何将我的代码重写为:

  1. 在第 1 阶段并行下载(现在可以一个一个地工作)。

  2. 有可能使用 GUI 正确停止 Pipeline(现在我通过按下 TButton 调用 FPipeline.Cancel,它不能立即停止任务)。

来源也放在here.

提前致谢。我很高兴在这里得到任何建议。

1) 就 OTL 而言,下载可以并行进行。在我的机器上,每次我按 F9 时,测试应用程序都会开始三个并行下载。其他两个下载卡在

hFile := InternetOpenURL(hInet, PChar(url), nil, 0, INTERNET_FLAG_PRAGMA_NOCACHE, 0);

打电话。 IOW,所有五个下载线程都进入 InternetOpenURL,但只有三个线程立即退出并开始下载。我不知道为什么(它与 WinINET 有关,而不是 OTL)。

2) 取消不起作用,因为没有人告诉 DownloadFile 方法停止。 IOmniPipeline.Cancel 只是在每个管道上调用 CompleteAdding 并告诉每个阶段停止处理输入。它无法停止已经在输入元素上运行的代码(即您的 DownloadFile 方法)。你必须自己做。

一种方法是创建一个全局 Cancel 标志并更改 DownloadFile 以便它检查是否在以下循环中设置了该标志:

repeat
  InternetReadFile(hFile, @buffer, SizeOf(buffer), bytesRead);
  BlockWrite(localFile, buffer, bytesRead);
  Inc(FDownloaded, bytesRead);
  //In real app this event occurs in TALWinHttpClient from Alcinoe library.
  InternalDownloadProgress(Self, FDownloaded, FFileSize);
  if FCancelled then break; // <-----------
until bytesRead = 0;

您还可以更改 InternalDownloadProgress 并添加一个 var cancelled: boolean 参数,该参数可以在管道需要关闭时在事件处理程序中设置。