firemonkey idTcp 和记录
firemonkey idTcp and Record
下午好。
客户端向服务器发送一条消息,服务器响应向客户端发送两条消息。
客户端看到这些消息,但备忘录记录了服务器发送的第一个值。
提示什么原因
服务器 ------------------------------------------ --------
unit Unit1;
interface
uses
System.SysUtils, System.Types, System.UITypes, System.Classes, System.Variants,
FMX.Types, FMX.Controls, FMX.Forms, FMX.Graphics, FMX.Dialogs, IDGlobal,
IdBaseComponent, IdComponent, IdCustomTCPServer, IdTCPServer, IdContext,
FMX.Controls.Presentation, FMX.StdCtrls;
type
TRec_Data = record
Flag: array[0..20] of char;
end;
TForm1 = class(TForm)
procedure FormCreate(Sender: TObject);
private
{ Private declarations }
public
MainPort: TIdTCPServer;
procedure MainPortConnect(AContext: TIdContext);
procedure MainPortExecute(AContext: TIdContext);
{ Public declarations }
end;
var
Form1: TForm1;
implementation
{$R *.fmx}
{ TForm1 }
procedure TForm1.FormCreate(Sender: TObject);
begin
MainPort := TIdTCPServer.Create;
MainPort.OnConnect := MainPortConnect;
MainPort.OnExecute := MainPortExecute;
MainPort.Bindings.Add.IP := '127.0.0.1';
MainPort.Bindings.Add.Port := 6000;
MainPort.Active := True;
end;
procedure TForm1.MainPortConnect(AContext: TIdContext);
begin
//
end;
procedure TForm1.MainPortExecute(AContext: TIdContext);
var
Rec: TRec_Data;
Buffer: TIdBytes;
begin
Rec.Flag := '1';
Buffer := RawToBytes(Rec, SizeOf(Rec));
AContext.Connection.IOHandler.Write(Buffer);
Rec.Flag := '2';
Buffer := RawToBytes(Rec, SizeOf(Rec));
AContext.Connection.IOHandler.Write(Buffer);
end;
end.
客户端---------------------------------------- --------
unit Unit1;
interface
uses
System.SysUtils, System.Types, System.UITypes, System.Classes, System.Variants,
FMX.Types, FMX.Controls, FMX.Forms, FMX.Graphics, FMX.Dialogs,
System.Generics.Collections,
IdTCPClient, IdGlobal, FMX.Controls.Presentation, FMX.ScrollBox, FMX.Memo;
type
TRec_Data = record
Flag: array[0..20] of char;
end;
TMyThread = class(TThread)
private
Progress: string;
Client : TIdTCPClient;
FQueue : TThreadedQueue<TRec_Data>;
protected
procedure Execute; override;
public
constructor Create(const AQueue : TThreadedQueue<TRec_Data>);
destructor Destroy; override;
end;
TForm1 = class(TForm)
procedure FormCreate(Sender: TObject);
procedure FormDestroy(Sender: TObject);
private
{ Private declarations }
FQueue : TThreadedQueue<TRec_Data>;
FMyThread : TMyThread;
Timer : TTimer;
procedure OnTimer(Sender: TObject);
public
Memo1: TMemo;
{ Public declarations }
end;
var
Form1: TForm1;
implementation
{$R *.fmx}
procedure TForm1.FormCreate(Sender: TObject);
begin
FQueue:=TThreadedQueue<TRec_Data>.Create(100, 1000, 10);
Timer:=TTimer.Create(Self);
Timer.Interval:=100;
Timer.OnTimer:=OnTimer;
Timer.Enabled:=True;
FMyThread:=TMyThread.Create(FQueue);
FMyThread.Start;
end;
procedure TForm1.FormDestroy(Sender: TObject);
begin
if Assigned(FMyThread) then
begin
FMyThread.Terminate;
FMyThread.WaitFor;
FMyThread.Free
end;
if Assigned(Timer) then
Timer.Free;
if Assigned(FQueue) then
FQueue.Free;
end;
procedure TForm1.OnTimer(Sender: TObject);
Var ARec : TRec_Data;
begin
// while FQueue.PopItem(ARec) = TWaitResult.wrSignaled do или
if FQueue.PopItem(ARec) = TWaitResult.wrSignaled then
Form1.Memo1.Lines.Insert(0, ARec.Flag);
end;
constructor TMyThread.Create(const AQueue : TThreadedQueue<TRec_Data>);
var
Rec: TRec_Data;
Buffer: TIdBytes;
begin
inherited Create(true);
FQueue:=AQueue;
Client := TIdTCPClient.Create(nil);
Client.Host := '127.0.0.1';
Client.Port := 6000;
Client.Connect;
// Передаем данные
if Client.Connected = True then
begin
Rec.Flag := 'addUser';
Buffer := RawToBytes(Rec, SizeOf(Rec));
Client.IOHandler.Write(Buffer);
end;
end;
destructor TMyThread.Destroy;
begin
if Assigned(Client) then
Client.Free;
inherited;
end;
procedure TMyThread.Execute;
var
Rec: TRec_Data;
Buffer: TIdBytes;
begin
while Not Terminated do
begin
if Client.Connected then
begin
Client.IOHandler.ReadBytes(Buffer, SizeOf(Rec));
BytesToRaw(Buffer, Rec, SizeOf(Rec));
Progress := Rec.Flag;
// Synchronize(SetProgress);
FQueue.PushItem(Rec);
end
else
Client.Connect;
TThread.Sleep(10);
end;
end;
end.
在服务器端,您忽略了客户端的请求,并用无休止的响应淹没了连接。 TIdTCPServer.OnExecute
事件在连接的生命周期内连续循环调用,而不是在客户端发送请求时调用。
在客户端,您是 运行 一个线程中的连续阅读循环,试图接收所有这些响应。但是您使用 TThread.Sleep()
确保循环读取消息的速度比服务器生成消息的速度慢得多,从而导致网络流量拥塞。
但更糟糕的是,您阻碍了客户端处理服务器消息的能力。您的 UI 计时器以 100 毫秒的间隔运行,而读取线程以 10 毫秒的间隔运行。因此,每个计时器间隔最多可以将 10 条消息推入队列。您的 OnTimer
事件处理程序每个时间间隔仅弹出 1 条消息,队列中最多留下 9 条消息。所以很快(~1 秒),队列将填满其最大容量 100 条消息,并且 PushItem()
将开始忽略消息。您根本没有检查推送 errors/timeouts。
此外,我发现您的代码存在其他问题。
在服务器端,您正在泄漏 TIdTCPServer
对象,因为您没有为其分配 Owner
,也没有 Free
。而且,您的表单的 OnCreate
事件处理程序正在向 TIdTCPServer
添加 2 个单独的绑定 - 一个在 127.0.0.1:0
上,另一个在 0.0.0.0:6000
上。它应该只添加一个绑定 - on 127.0.0.1:6000
.
在客户端,创建线程时,不应在线程的构造函数中调用 TIdTCPClient.Connect()
或 TIdIOHandler.Write()
,它们仅属于线程的 Execute()
方法。
最后,我建议使用 TQueue<TRec_Data>
而不是 TThreadedQueue<TRec_Data>
。后者使用自己的内部线程来管理 push/pop 超时,在这种情况下这是浪费的开销。您可以使用 TMonitor
或 TEvent
来完成同样的事情,而无需额外的线程。
话虽如此,请尝试更类似的方法:
服务器:
unit Unit1;
interface
uses
System.SysUtils, System.Types, System.UITypes, System.Classes, System.Variants,
FMX.Types, FMX.Controls, FMX.Forms, FMX.Graphics, FMX.Dialogs, IdGlobal,
IdBaseComponent, IdComponent, IdCustomTCPServer, IdTCPServer, IdContext,
FMX.Controls.Presentation, FMX.StdCtrls;
type
TRec_Data = packed record
Flag: array[0..20] of char;
end;
TForm1 = class(TForm)
procedure FormCreate(Sender: TObject);
private
{ Private declarations }
public
MainPort: TIdTCPServer;
procedure MainPortConnect(AContext: TIdContext);
procedure MainPortExecute(AContext: TIdContext);
{ Public declarations }
end;
var
Form1: TForm1;
implementation
{$R *.fmx}
{ TForm1 }
procedure TForm1.FormCreate(Sender: TObject);
var
Binding: TIdSocketHandle;
begin
MainPort := TIdTCPServer.Create(Self);
MainPort.OnConnect := MainPortConnect;
MainPort.OnExecute := MainPortExecute;
// and a single listening socket for 127.0.0.1:6000
Binding := MainPort.Bindings.Add;
Binding.IP := '127.0.0.1';
Binding.Port := 6000;
MainPort.Active := True;
end;
procedure TForm1.MainPortConnect(AContext: TIdContext);
begin
//...
end;
procedure TForm1.MainPortExecute(AContext: TIdContext);
var
Rec: TRec_Data;
Buffer: TIdBytes;
begin
// check if the client has sent any messages waiting to be read...
if AContext.Connection.IOHandler.InputBufferIsEmpty then
begin
AContext.Connection.IOHandler.CheckForDataOnSource(0);
AContext.Connection.IOHandler.CheckForDisconnect;
end;
if not AContext.Connection.IOHandler.InputBufferIsEmpty then
begin
// read a pending client message and process it as needed...
AContext.Connection.IOHandler.ReadBytes(Buffer, SizeOf(Rec));
BytesToRaw(Buffer, Rec, SizeOf(Rec));
//...
end;
// send messages to the client...
Rec.Flag := '1';
Buffer := RawToBytes(Rec, SizeOf(Rec));
AContext.Connection.IOHandler.Write(Buffer);
Rec.Flag := '2';
Buffer := RawToBytes(Rec, SizeOf(Rec));
AContext.Connection.IOHandler.Write(Buffer);
end;
end.
客户:
unit Unit1;
interface
uses
System.SysUtils, System.Types, System.UITypes, System.Classes, System.Variants,
FMX.Types, FMX.Controls, FMX.Forms, FMX.Graphics, FMX.Dialogs,
System.Generics.Collections,
IdTCPClient, IdGlobal, FMX.Controls.Presentation, FMX.ScrollBox, FMX.Memo,
System.SyncObjs;
type
TRec_Data = packet record
Flag: array[0..20] of char;
end;
TMyThread = class(TThread)
private
FQueue : TQueue<TRec_Data>;
FTermEvent : TEvent;
protected
procedure Execute; override;
procedure TerminatedSet; override;
public
constructor Create(const AQueue : TQueue<TRec_Data>); reintroduce;
destructor Destroy; override;
end;
TForm1 = class(TForm)
procedure FormCreate(Sender: TObject);
procedure FormDestroy(Sender: TObject);
private
{ Private declarations }
FQueue : TQueue<TRec_Data>;
FMyThread : TMyThread;
Timer : TTimer;
procedure OnTimer(Sender: TObject);
public
Memo1: TMemo;
{ Public declarations }
end;
var
Form1: TForm1;
implementation
{$R *.fmx}
procedure TForm1.FormCreate(Sender: TObject);
begin
FQueue := TQueue<TRec_Data>.Create;
Timer := TTimer.Create(Self);
Timer.Interval := 100;
Timer.OnTimer := OnTimer;
Timer.Enabled := True;
FMyThread := TMyThread.Create(FQueue);
end;
procedure TForm1.FormDestroy(Sender: TObject);
begin
if Assigned(FMyThread) then
begin
FMyThread.Terminate;
FMyThread.WaitFor;
FMyThread.Free;
end;
if Assigned(Timer) then
Timer.Free;
if Assigned(FQueue) then
FQueue.Free;
end;
procedure TForm1.OnTimer(Sender: TObject);
var
ARec : TRec_Data;
begin
// wait up to 10ms for the queue to be accessible...
if not TMonitor.Enter(FQueue, 10) then Exit;
try
// process all pending messages and remove them from the queue...
while FQueue.Count > 0 do
begin
ARec := FQueue.Dequeue;
Memo1.Lines.Insert(0, ARec.Flag);
end;
finally
TMonitor.Exit(FQueue);
end;
end;
constructor TMyThread.Create(const AQueue : TQueue<TRec_Data>);
begin
inherited Create(false);
FQueue := AQueue;
// used to signal Execute() to exit immediately while waiting
// to call Connect() after a failed connection...
FTermEvent := TEvent.Create(nil, true, false, '');
end;
procedure TMyThread.Destroy;
begin
FTermEvent.Free;
inherited;
end;
procedure TMyThread.TerminatedSet;
begin
// Terminate() was called, signal Execute() now...
FTermEvent.SetEvent;
end;
procedure TMyThread.Execute;
var
Client: TIdTCPClient;
Rec: TRec_Data;
Buffer: TIdBytes;
begin
Client := TIdTCPClient.Create(nil);
try
Client.Host := '127.0.0.1';
Client.Port := 6000;
Client.ConnectTimeout := 5000;
Client.ReadTimeout := 5000;
while not Terminated do
begin
// try to connect to the server...
try
Client.Connect;
except
// wait 5 secs to try again...
FTermEvent.WaitFor(5000);
Continue;
end;
// connected...
try
try
Rec.Flag := 'addUser';
Buffer := RawToBytes(Rec, SizeOf(Rec));
Client.IOHandler.Write(Buffer);
// communicate with the server until disconnected or terminating...
while not Terminated do
begin
// send other messages to the server as needed...
// check if the server has sent any messages waiting to be read.
// don't block the thread unless there is a message to read...
if Client.IOHandler.InputBufferIsEmpty then
begin
Client.IOHandler.CheckForDataOnSource(100);
Client.IOHandler.CheckForDisconnect;
if Client.IOHandler.InputBufferIsEmpty then Continue;
end;
// read a message...
Client.IOHandler.ReadBytes(Buffer, SizeOf(Rec));
BytesToRaw(Buffer, Rec, SizeOf(Rec));
// wait up to 1 sec for the queue to be accessible...
if not TMonitor.Enter(FQueue, 1000) then
begin
// can't add message to queue yet, do something ...
end else
begin
// add message to queue...
try
FQueue.Enqueue(Rec);
finally
TMonitor.Exit(FQueue);
end;
end;
end;
finally
Client.Disconnect;
end;
except
// something unexpected happened, will reconnect and
// try again if not terminated...
end;
end;
finally
Client.Free;
end;
end;
end.
下午好。
客户端向服务器发送一条消息,服务器响应向客户端发送两条消息。
客户端看到这些消息,但备忘录记录了服务器发送的第一个值。
提示什么原因
服务器 ------------------------------------------ --------
unit Unit1;
interface
uses
System.SysUtils, System.Types, System.UITypes, System.Classes, System.Variants,
FMX.Types, FMX.Controls, FMX.Forms, FMX.Graphics, FMX.Dialogs, IDGlobal,
IdBaseComponent, IdComponent, IdCustomTCPServer, IdTCPServer, IdContext,
FMX.Controls.Presentation, FMX.StdCtrls;
type
TRec_Data = record
Flag: array[0..20] of char;
end;
TForm1 = class(TForm)
procedure FormCreate(Sender: TObject);
private
{ Private declarations }
public
MainPort: TIdTCPServer;
procedure MainPortConnect(AContext: TIdContext);
procedure MainPortExecute(AContext: TIdContext);
{ Public declarations }
end;
var
Form1: TForm1;
implementation
{$R *.fmx}
{ TForm1 }
procedure TForm1.FormCreate(Sender: TObject);
begin
MainPort := TIdTCPServer.Create;
MainPort.OnConnect := MainPortConnect;
MainPort.OnExecute := MainPortExecute;
MainPort.Bindings.Add.IP := '127.0.0.1';
MainPort.Bindings.Add.Port := 6000;
MainPort.Active := True;
end;
procedure TForm1.MainPortConnect(AContext: TIdContext);
begin
//
end;
procedure TForm1.MainPortExecute(AContext: TIdContext);
var
Rec: TRec_Data;
Buffer: TIdBytes;
begin
Rec.Flag := '1';
Buffer := RawToBytes(Rec, SizeOf(Rec));
AContext.Connection.IOHandler.Write(Buffer);
Rec.Flag := '2';
Buffer := RawToBytes(Rec, SizeOf(Rec));
AContext.Connection.IOHandler.Write(Buffer);
end;
end.
客户端---------------------------------------- --------
unit Unit1;
interface
uses
System.SysUtils, System.Types, System.UITypes, System.Classes, System.Variants,
FMX.Types, FMX.Controls, FMX.Forms, FMX.Graphics, FMX.Dialogs,
System.Generics.Collections,
IdTCPClient, IdGlobal, FMX.Controls.Presentation, FMX.ScrollBox, FMX.Memo;
type
TRec_Data = record
Flag: array[0..20] of char;
end;
TMyThread = class(TThread)
private
Progress: string;
Client : TIdTCPClient;
FQueue : TThreadedQueue<TRec_Data>;
protected
procedure Execute; override;
public
constructor Create(const AQueue : TThreadedQueue<TRec_Data>);
destructor Destroy; override;
end;
TForm1 = class(TForm)
procedure FormCreate(Sender: TObject);
procedure FormDestroy(Sender: TObject);
private
{ Private declarations }
FQueue : TThreadedQueue<TRec_Data>;
FMyThread : TMyThread;
Timer : TTimer;
procedure OnTimer(Sender: TObject);
public
Memo1: TMemo;
{ Public declarations }
end;
var
Form1: TForm1;
implementation
{$R *.fmx}
procedure TForm1.FormCreate(Sender: TObject);
begin
FQueue:=TThreadedQueue<TRec_Data>.Create(100, 1000, 10);
Timer:=TTimer.Create(Self);
Timer.Interval:=100;
Timer.OnTimer:=OnTimer;
Timer.Enabled:=True;
FMyThread:=TMyThread.Create(FQueue);
FMyThread.Start;
end;
procedure TForm1.FormDestroy(Sender: TObject);
begin
if Assigned(FMyThread) then
begin
FMyThread.Terminate;
FMyThread.WaitFor;
FMyThread.Free
end;
if Assigned(Timer) then
Timer.Free;
if Assigned(FQueue) then
FQueue.Free;
end;
procedure TForm1.OnTimer(Sender: TObject);
Var ARec : TRec_Data;
begin
// while FQueue.PopItem(ARec) = TWaitResult.wrSignaled do или
if FQueue.PopItem(ARec) = TWaitResult.wrSignaled then
Form1.Memo1.Lines.Insert(0, ARec.Flag);
end;
constructor TMyThread.Create(const AQueue : TThreadedQueue<TRec_Data>);
var
Rec: TRec_Data;
Buffer: TIdBytes;
begin
inherited Create(true);
FQueue:=AQueue;
Client := TIdTCPClient.Create(nil);
Client.Host := '127.0.0.1';
Client.Port := 6000;
Client.Connect;
// Передаем данные
if Client.Connected = True then
begin
Rec.Flag := 'addUser';
Buffer := RawToBytes(Rec, SizeOf(Rec));
Client.IOHandler.Write(Buffer);
end;
end;
destructor TMyThread.Destroy;
begin
if Assigned(Client) then
Client.Free;
inherited;
end;
procedure TMyThread.Execute;
var
Rec: TRec_Data;
Buffer: TIdBytes;
begin
while Not Terminated do
begin
if Client.Connected then
begin
Client.IOHandler.ReadBytes(Buffer, SizeOf(Rec));
BytesToRaw(Buffer, Rec, SizeOf(Rec));
Progress := Rec.Flag;
// Synchronize(SetProgress);
FQueue.PushItem(Rec);
end
else
Client.Connect;
TThread.Sleep(10);
end;
end;
end.
在服务器端,您忽略了客户端的请求,并用无休止的响应淹没了连接。 TIdTCPServer.OnExecute
事件在连接的生命周期内连续循环调用,而不是在客户端发送请求时调用。
在客户端,您是 运行 一个线程中的连续阅读循环,试图接收所有这些响应。但是您使用 TThread.Sleep()
确保循环读取消息的速度比服务器生成消息的速度慢得多,从而导致网络流量拥塞。
但更糟糕的是,您阻碍了客户端处理服务器消息的能力。您的 UI 计时器以 100 毫秒的间隔运行,而读取线程以 10 毫秒的间隔运行。因此,每个计时器间隔最多可以将 10 条消息推入队列。您的 OnTimer
事件处理程序每个时间间隔仅弹出 1 条消息,队列中最多留下 9 条消息。所以很快(~1 秒),队列将填满其最大容量 100 条消息,并且 PushItem()
将开始忽略消息。您根本没有检查推送 errors/timeouts。
此外,我发现您的代码存在其他问题。
在服务器端,您正在泄漏 TIdTCPServer
对象,因为您没有为其分配 Owner
,也没有 Free
。而且,您的表单的 OnCreate
事件处理程序正在向 TIdTCPServer
添加 2 个单独的绑定 - 一个在 127.0.0.1:0
上,另一个在 0.0.0.0:6000
上。它应该只添加一个绑定 - on 127.0.0.1:6000
.
在客户端,创建线程时,不应在线程的构造函数中调用 TIdTCPClient.Connect()
或 TIdIOHandler.Write()
,它们仅属于线程的 Execute()
方法。
最后,我建议使用 TQueue<TRec_Data>
而不是 TThreadedQueue<TRec_Data>
。后者使用自己的内部线程来管理 push/pop 超时,在这种情况下这是浪费的开销。您可以使用 TMonitor
或 TEvent
来完成同样的事情,而无需额外的线程。
话虽如此,请尝试更类似的方法:
服务器:
unit Unit1;
interface
uses
System.SysUtils, System.Types, System.UITypes, System.Classes, System.Variants,
FMX.Types, FMX.Controls, FMX.Forms, FMX.Graphics, FMX.Dialogs, IdGlobal,
IdBaseComponent, IdComponent, IdCustomTCPServer, IdTCPServer, IdContext,
FMX.Controls.Presentation, FMX.StdCtrls;
type
TRec_Data = packed record
Flag: array[0..20] of char;
end;
TForm1 = class(TForm)
procedure FormCreate(Sender: TObject);
private
{ Private declarations }
public
MainPort: TIdTCPServer;
procedure MainPortConnect(AContext: TIdContext);
procedure MainPortExecute(AContext: TIdContext);
{ Public declarations }
end;
var
Form1: TForm1;
implementation
{$R *.fmx}
{ TForm1 }
procedure TForm1.FormCreate(Sender: TObject);
var
Binding: TIdSocketHandle;
begin
MainPort := TIdTCPServer.Create(Self);
MainPort.OnConnect := MainPortConnect;
MainPort.OnExecute := MainPortExecute;
// and a single listening socket for 127.0.0.1:6000
Binding := MainPort.Bindings.Add;
Binding.IP := '127.0.0.1';
Binding.Port := 6000;
MainPort.Active := True;
end;
procedure TForm1.MainPortConnect(AContext: TIdContext);
begin
//...
end;
procedure TForm1.MainPortExecute(AContext: TIdContext);
var
Rec: TRec_Data;
Buffer: TIdBytes;
begin
// check if the client has sent any messages waiting to be read...
if AContext.Connection.IOHandler.InputBufferIsEmpty then
begin
AContext.Connection.IOHandler.CheckForDataOnSource(0);
AContext.Connection.IOHandler.CheckForDisconnect;
end;
if not AContext.Connection.IOHandler.InputBufferIsEmpty then
begin
// read a pending client message and process it as needed...
AContext.Connection.IOHandler.ReadBytes(Buffer, SizeOf(Rec));
BytesToRaw(Buffer, Rec, SizeOf(Rec));
//...
end;
// send messages to the client...
Rec.Flag := '1';
Buffer := RawToBytes(Rec, SizeOf(Rec));
AContext.Connection.IOHandler.Write(Buffer);
Rec.Flag := '2';
Buffer := RawToBytes(Rec, SizeOf(Rec));
AContext.Connection.IOHandler.Write(Buffer);
end;
end.
客户:
unit Unit1;
interface
uses
System.SysUtils, System.Types, System.UITypes, System.Classes, System.Variants,
FMX.Types, FMX.Controls, FMX.Forms, FMX.Graphics, FMX.Dialogs,
System.Generics.Collections,
IdTCPClient, IdGlobal, FMX.Controls.Presentation, FMX.ScrollBox, FMX.Memo,
System.SyncObjs;
type
TRec_Data = packet record
Flag: array[0..20] of char;
end;
TMyThread = class(TThread)
private
FQueue : TQueue<TRec_Data>;
FTermEvent : TEvent;
protected
procedure Execute; override;
procedure TerminatedSet; override;
public
constructor Create(const AQueue : TQueue<TRec_Data>); reintroduce;
destructor Destroy; override;
end;
TForm1 = class(TForm)
procedure FormCreate(Sender: TObject);
procedure FormDestroy(Sender: TObject);
private
{ Private declarations }
FQueue : TQueue<TRec_Data>;
FMyThread : TMyThread;
Timer : TTimer;
procedure OnTimer(Sender: TObject);
public
Memo1: TMemo;
{ Public declarations }
end;
var
Form1: TForm1;
implementation
{$R *.fmx}
procedure TForm1.FormCreate(Sender: TObject);
begin
FQueue := TQueue<TRec_Data>.Create;
Timer := TTimer.Create(Self);
Timer.Interval := 100;
Timer.OnTimer := OnTimer;
Timer.Enabled := True;
FMyThread := TMyThread.Create(FQueue);
end;
procedure TForm1.FormDestroy(Sender: TObject);
begin
if Assigned(FMyThread) then
begin
FMyThread.Terminate;
FMyThread.WaitFor;
FMyThread.Free;
end;
if Assigned(Timer) then
Timer.Free;
if Assigned(FQueue) then
FQueue.Free;
end;
procedure TForm1.OnTimer(Sender: TObject);
var
ARec : TRec_Data;
begin
// wait up to 10ms for the queue to be accessible...
if not TMonitor.Enter(FQueue, 10) then Exit;
try
// process all pending messages and remove them from the queue...
while FQueue.Count > 0 do
begin
ARec := FQueue.Dequeue;
Memo1.Lines.Insert(0, ARec.Flag);
end;
finally
TMonitor.Exit(FQueue);
end;
end;
constructor TMyThread.Create(const AQueue : TQueue<TRec_Data>);
begin
inherited Create(false);
FQueue := AQueue;
// used to signal Execute() to exit immediately while waiting
// to call Connect() after a failed connection...
FTermEvent := TEvent.Create(nil, true, false, '');
end;
procedure TMyThread.Destroy;
begin
FTermEvent.Free;
inherited;
end;
procedure TMyThread.TerminatedSet;
begin
// Terminate() was called, signal Execute() now...
FTermEvent.SetEvent;
end;
procedure TMyThread.Execute;
var
Client: TIdTCPClient;
Rec: TRec_Data;
Buffer: TIdBytes;
begin
Client := TIdTCPClient.Create(nil);
try
Client.Host := '127.0.0.1';
Client.Port := 6000;
Client.ConnectTimeout := 5000;
Client.ReadTimeout := 5000;
while not Terminated do
begin
// try to connect to the server...
try
Client.Connect;
except
// wait 5 secs to try again...
FTermEvent.WaitFor(5000);
Continue;
end;
// connected...
try
try
Rec.Flag := 'addUser';
Buffer := RawToBytes(Rec, SizeOf(Rec));
Client.IOHandler.Write(Buffer);
// communicate with the server until disconnected or terminating...
while not Terminated do
begin
// send other messages to the server as needed...
// check if the server has sent any messages waiting to be read.
// don't block the thread unless there is a message to read...
if Client.IOHandler.InputBufferIsEmpty then
begin
Client.IOHandler.CheckForDataOnSource(100);
Client.IOHandler.CheckForDisconnect;
if Client.IOHandler.InputBufferIsEmpty then Continue;
end;
// read a message...
Client.IOHandler.ReadBytes(Buffer, SizeOf(Rec));
BytesToRaw(Buffer, Rec, SizeOf(Rec));
// wait up to 1 sec for the queue to be accessible...
if not TMonitor.Enter(FQueue, 1000) then
begin
// can't add message to queue yet, do something ...
end else
begin
// add message to queue...
try
FQueue.Enqueue(Rec);
finally
TMonitor.Exit(FQueue);
end;
end;
end;
finally
Client.Disconnect;
end;
except
// something unexpected happened, will reconnect and
// try again if not terminated...
end;
end;
finally
Client.Free;
end;
end;
end.