firemonkey idTcp 和记录

firemonkey idTcp and Record

下午好。

客户端向服务器发送一条消息,服务器响应向客户端发送两条消息。

客户端看到这些消息,但备忘录记录了服务器发送的第一个值。

提示什么原因

服务器 ------------------------------------------ --------

unit Unit1;

interface

uses
  System.SysUtils, System.Types, System.UITypes, System.Classes, System.Variants,
  FMX.Types, FMX.Controls, FMX.Forms, FMX.Graphics, FMX.Dialogs, IDGlobal,
  IdBaseComponent, IdComponent, IdCustomTCPServer, IdTCPServer, IdContext,
  FMX.Controls.Presentation, FMX.StdCtrls;

type
  TRec_Data = record
    Flag: array[0..20] of char;
  end;

  TForm1 = class(TForm)
    procedure FormCreate(Sender: TObject);
  private
    { Private declarations }
  public
    MainPort: TIdTCPServer;
    procedure MainPortConnect(AContext: TIdContext);
    procedure MainPortExecute(AContext: TIdContext);
    { Public declarations }
  end;

var
  Form1: TForm1;

implementation

{$R *.fmx}

{ TForm1 }

procedure TForm1.FormCreate(Sender: TObject);
begin
  MainPort := TIdTCPServer.Create;
  MainPort.OnConnect :=  MainPortConnect;
  MainPort.OnExecute := MainPortExecute;
  MainPort.Bindings.Add.IP   := '127.0.0.1';
  MainPort.Bindings.Add.Port := 6000;
  MainPort.Active := True;
end;

procedure TForm1.MainPortConnect(AContext: TIdContext);
begin
//
end;

procedure TForm1.MainPortExecute(AContext: TIdContext);
var
  Rec: TRec_Data;
  Buffer: TIdBytes;
begin
  Rec.Flag := '1';
  Buffer := RawToBytes(Rec, SizeOf(Rec));
  AContext.Connection.IOHandler.Write(Buffer);

  Rec.Flag := '2';
  Buffer := RawToBytes(Rec, SizeOf(Rec));
  AContext.Connection.IOHandler.Write(Buffer);

end;

end.

客户端---------------------------------------- --------

    unit Unit1;

interface

uses
  System.SysUtils, System.Types, System.UITypes, System.Classes, System.Variants,
  FMX.Types, FMX.Controls, FMX.Forms, FMX.Graphics, FMX.Dialogs,
  System.Generics.Collections,
  IdTCPClient, IdGlobal, FMX.Controls.Presentation, FMX.ScrollBox, FMX.Memo;

type
  TRec_Data = record
    Flag: array[0..20] of char;
  end;

  TMyThread = class(TThread)
  private
    Progress: string;
    Client : TIdTCPClient;
    FQueue : TThreadedQueue<TRec_Data>;
  protected
    procedure Execute; override;
  public
    constructor Create(const AQueue : TThreadedQueue<TRec_Data>);
    destructor Destroy; override;
  end;

  TForm1 = class(TForm)
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
  private
    { Private declarations }
    FQueue : TThreadedQueue<TRec_Data>;
    FMyThread : TMyThread;
    Timer : TTimer;
    procedure OnTimer(Sender: TObject);
  public
    Memo1: TMemo;
    { Public declarations }
  end;

var
  Form1: TForm1;

implementation

{$R *.fmx}

procedure TForm1.FormCreate(Sender: TObject);
begin
  FQueue:=TThreadedQueue<TRec_Data>.Create(100, 1000, 10);

  Timer:=TTimer.Create(Self);
  Timer.Interval:=100;
  Timer.OnTimer:=OnTimer;
  Timer.Enabled:=True;

  FMyThread:=TMyThread.Create(FQueue);
  FMyThread.Start;
end;

procedure TForm1.FormDestroy(Sender: TObject);
begin
  if Assigned(FMyThread) then
  begin
    FMyThread.Terminate;
    FMyThread.WaitFor;
    FMyThread.Free
  end;
  if Assigned(Timer) then
    Timer.Free;
  if Assigned(FQueue) then
    FQueue.Free;
end;

procedure TForm1.OnTimer(Sender: TObject);
Var ARec : TRec_Data;
begin
//  while FQueue.PopItem(ARec) = TWaitResult.wrSignaled do или
  if FQueue.PopItem(ARec) = TWaitResult.wrSignaled then
    Form1.Memo1.Lines.Insert(0, ARec.Flag);
end;

constructor TMyThread.Create(const AQueue : TThreadedQueue<TRec_Data>);
var
  Rec: TRec_Data;
  Buffer: TIdBytes;
begin
  inherited Create(true);

  FQueue:=AQueue;

  Client := TIdTCPClient.Create(nil);
  Client.Host := '127.0.0.1';
  Client.Port := 6000;
  Client.Connect;

  // Передаем данные
  if Client.Connected = True then
  begin
    Rec.Flag := 'addUser';

    Buffer := RawToBytes(Rec, SizeOf(Rec));
    Client.IOHandler.Write(Buffer);
  end;
end;

destructor TMyThread.Destroy;
begin
  if Assigned(Client) then
    Client.Free;
  inherited;
end;

procedure TMyThread.Execute;
var
  Rec: TRec_Data;
  Buffer: TIdBytes;
begin
  while Not Terminated do
  begin
    if Client.Connected then
    begin
      Client.IOHandler.ReadBytes(Buffer, SizeOf(Rec));
      BytesToRaw(Buffer, Rec, SizeOf(Rec));
      Progress := Rec.Flag;
//      Synchronize(SetProgress);
      FQueue.PushItem(Rec);
    end
    else
      Client.Connect;
    TThread.Sleep(10);
  end;
end;


end.

在服务器端,您忽略了客户端的请求,并用无休止的响应淹没了连接。 TIdTCPServer.OnExecute 事件在连接的生命周期内连续循环调用,而不是在客户端发送请求时调用。

在客户端,您是 运行 一个线程中的连续阅读循环,试图接收所有这些响应。但是您使用 TThread.Sleep() 确保循环读取消息的速度比服务器生成消息的速度慢得多,从而导致网络流量拥塞。

但更糟糕的是,您阻碍了客户端处理服务器消息的能力。您的 UI 计时器以 100 毫秒的间隔运行,而读取线程以 10 毫秒的间隔运行。因此,每个计时器间隔最多可以将 10 条消息推入队列。您的 OnTimer 事件处理程序每​​个时间间隔仅弹出 1 条消息,队列中最多留下 9 条消息。所以很快(~1 秒),队列将填满其最大容量 100 条消息,并且 PushItem() 将开始忽略消息。您根本没有检查推送 errors/timeouts。

此外,我发现您的代码存在其他问题。

在服务器端,您正在泄漏 TIdTCPServer 对象,因为您没有为其分配 Owner,也没有 Free。而且,您的表单的 OnCreate 事件处理程序正在向 TIdTCPServer 添加 2 个单独的绑定 - 一个在 127.0.0.1:0 上,另一个在 0.0.0.0:6000 上。它应该只添加一个绑定 - on 127.0.0.1:6000.

在客户端,创建线程时,不应在线程的构造函数中调用 TIdTCPClient.Connect()TIdIOHandler.Write(),它们仅属于线程的 Execute() 方法。

最后,我建议使用 TQueue<TRec_Data> 而不是 TThreadedQueue<TRec_Data>。后者使用自己的内部线程来管理 push/pop 超时,在这种情况下这是浪费的开销。您可以使用 TMonitorTEvent 来完成同样的事情,而无需额外的线程。

话虽如此,请尝试更类似的方法:

服务器:

unit Unit1;

interface

uses
  System.SysUtils, System.Types, System.UITypes, System.Classes, System.Variants,
  FMX.Types, FMX.Controls, FMX.Forms, FMX.Graphics, FMX.Dialogs, IdGlobal,
  IdBaseComponent, IdComponent, IdCustomTCPServer, IdTCPServer, IdContext,
  FMX.Controls.Presentation, FMX.StdCtrls;

type
  TRec_Data = packed record
    Flag: array[0..20] of char;
  end;

  TForm1 = class(TForm)
    procedure FormCreate(Sender: TObject);
  private
    { Private declarations }
  public
    MainPort: TIdTCPServer;
    procedure MainPortConnect(AContext: TIdContext);
    procedure MainPortExecute(AContext: TIdContext);
    { Public declarations }
  end;

var
  Form1: TForm1;

implementation

{$R *.fmx}

{ TForm1 }

procedure TForm1.FormCreate(Sender: TObject);
var
  Binding: TIdSocketHandle;
begin
  MainPort := TIdTCPServer.Create(Self);
  MainPort.OnConnect := MainPortConnect;
  MainPort.OnExecute := MainPortExecute;

  // and a single listening socket for 127.0.0.1:6000
  Binding := MainPort.Bindings.Add;
  Binding.IP := '127.0.0.1';
  Binding.Port := 6000;

  MainPort.Active := True;
end;

procedure TForm1.MainPortConnect(AContext: TIdContext);
begin
  //...
end;

procedure TForm1.MainPortExecute(AContext: TIdContext);
var
  Rec: TRec_Data;
  Buffer: TIdBytes;
begin
  // check if the client has sent any messages waiting to be read...
  if AContext.Connection.IOHandler.InputBufferIsEmpty then
  begin
    AContext.Connection.IOHandler.CheckForDataOnSource(0);
    AContext.Connection.IOHandler.CheckForDisconnect;
  end;

  if not AContext.Connection.IOHandler.InputBufferIsEmpty then
  begin
    // read a pending client message and process it as needed...
    AContext.Connection.IOHandler.ReadBytes(Buffer, SizeOf(Rec));
    BytesToRaw(Buffer, Rec, SizeOf(Rec));
    //...
  end;

  // send messages to the client...

  Rec.Flag := '1';
  Buffer := RawToBytes(Rec, SizeOf(Rec));
  AContext.Connection.IOHandler.Write(Buffer);

  Rec.Flag := '2';
  Buffer := RawToBytes(Rec, SizeOf(Rec));
  AContext.Connection.IOHandler.Write(Buffer);    
end;

end.

客户:

unit Unit1;

interface

uses
  System.SysUtils, System.Types, System.UITypes, System.Classes, System.Variants,
  FMX.Types, FMX.Controls, FMX.Forms, FMX.Graphics, FMX.Dialogs,
  System.Generics.Collections,
  IdTCPClient, IdGlobal, FMX.Controls.Presentation, FMX.ScrollBox, FMX.Memo,
  System.SyncObjs;

type
  TRec_Data = packet record
    Flag: array[0..20] of char;
  end;

  TMyThread = class(TThread)
  private
    FQueue : TQueue<TRec_Data>;
    FTermEvent : TEvent;
  protected
    procedure Execute; override;
    procedure TerminatedSet; override;
  public
    constructor Create(const AQueue : TQueue<TRec_Data>); reintroduce;
    destructor Destroy; override;
  end;

  TForm1 = class(TForm)
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
  private
    { Private declarations }
    FQueue : TQueue<TRec_Data>;
    FMyThread : TMyThread;
    Timer : TTimer;
    procedure OnTimer(Sender: TObject);
  public
    Memo1: TMemo;
    { Public declarations }
  end;

var
  Form1: TForm1;

implementation

{$R *.fmx}

procedure TForm1.FormCreate(Sender: TObject);
begin
  FQueue := TQueue<TRec_Data>.Create;

  Timer := TTimer.Create(Self);
  Timer.Interval := 100;
  Timer.OnTimer := OnTimer;
  Timer.Enabled := True;

  FMyThread := TMyThread.Create(FQueue);
end;

procedure TForm1.FormDestroy(Sender: TObject);
begin
  if Assigned(FMyThread) then
  begin
    FMyThread.Terminate;
    FMyThread.WaitFor;
    FMyThread.Free;
  end;

  if Assigned(Timer) then
    Timer.Free;

  if Assigned(FQueue) then
    FQueue.Free;
end;

procedure TForm1.OnTimer(Sender: TObject);
var
  ARec : TRec_Data;
begin
  // wait up to 10ms for the queue to be accessible...
  if not TMonitor.Enter(FQueue, 10) then Exit;
  try
    // process all pending messages and remove them from the queue...
    while FQueue.Count > 0 do
    begin
      ARec := FQueue.Dequeue;
      Memo1.Lines.Insert(0, ARec.Flag);
    end;
  finally
    TMonitor.Exit(FQueue);
  end;
end;

constructor TMyThread.Create(const AQueue : TQueue<TRec_Data>);
begin
  inherited Create(false);
  FQueue := AQueue;

  // used to signal Execute() to exit immediately while waiting
  // to call Connect() after a failed connection...
  FTermEvent := TEvent.Create(nil, true, false, '');
end;

procedure TMyThread.Destroy;
begin
  FTermEvent.Free;
  inherited;
end;

procedure TMyThread.TerminatedSet;
begin
  // Terminate() was called, signal Execute() now...
  FTermEvent.SetEvent;
end;

procedure TMyThread.Execute;
var
  Client: TIdTCPClient;
  Rec: TRec_Data;
  Buffer: TIdBytes;
begin
  Client := TIdTCPClient.Create(nil);
  try
    Client.Host := '127.0.0.1';
    Client.Port := 6000;
    Client.ConnectTimeout := 5000;
    Client.ReadTimeout := 5000;

    while not Terminated do
    begin
      // try to connect to the server...
      try
        Client.Connect;
      except
        // wait 5 secs to try again...
        FTermEvent.WaitFor(5000);
        Continue;
      end;

      // connected...

      try
        try
          Rec.Flag := 'addUser';    
          Buffer := RawToBytes(Rec, SizeOf(Rec));
          Client.IOHandler.Write(Buffer);

          // communicate with the server until disconnected or terminating...
          while not Terminated do
          begin
            // send other messages to the server as needed...

            // check if the server has sent any messages waiting to be read.
            // don't block the thread unless there is a message to read...
            if Client.IOHandler.InputBufferIsEmpty then
            begin
              Client.IOHandler.CheckForDataOnSource(100);
              Client.IOHandler.CheckForDisconnect;
              if Client.IOHandler.InputBufferIsEmpty then Continue;
            end;

            // read a message...
            Client.IOHandler.ReadBytes(Buffer, SizeOf(Rec));
            BytesToRaw(Buffer, Rec, SizeOf(Rec));

            // wait up to 1 sec for the queue to be accessible...
            if not TMonitor.Enter(FQueue, 1000) then
            begin
              // can't add message to queue yet, do something ...
            end else
            begin
              // add message to queue...
              try
                FQueue.Enqueue(Rec);
              finally
                TMonitor.Exit(FQueue);
              end;
            end;
          end;
        finally
          Client.Disconnect;
        end;
      except
        // something unexpected happened, will reconnect and
        // try again if not terminated...
      end;
    end;
  finally
    Client.Free;
  end;
end;    

end.