无法从 Datasnap 服务器检索大于 260.000 字节的 TStream

Can't retrieve TStreams bigger than around 260.000 bytes from a Datasnap Server

我有一个 Delphi 10.1 Berlin Datasnap Server,它不能 return 数据包(通过 TStream)大于大约 260.000 字节。

我按照 Delphi 中的 \Object Pascal\DataSnap\FireDAC 示例对其进行了编程,它也显示了这个问题。

只需打开该示例,将 ServerMethodsUnit.pas 上的 qOrders 组件的 IndexFieldName 设置为空白,并将其 SQL 属性 更改为:

即可看到问题
select * from Orders
union 
select * from Orders

现在要发送的数据量超过了 260.000 字节,这似乎是您无法从客户端检索它的地步。获取 EFDException [FireDAC][Stan]-710。无效的二进制存储格式。

数据作为您从服务器上的 FDSchemaAdapter 获得的 Stream 发送,然后加载到客户端上的另一个 FDSchemaAdpater。 Client和Server之间的连接也是FireDAC。

这就是服务器 return 流的方式:

function TServerMethods.StreamGet: TStream;
begin
  Result := TMemoryStream.Create;
  try
    qCustomers.Close;
    qCustomers.Open;
    qOrders.Close;
    qOrders.Open;
    FDSchemaAdapter.SaveToStream(Result, TFDStorageFormat.sfBinary);
    Result.Position := 0;
  except
    raise;
  end;
end;

这就是客户端检索它的方式:

procedure TClientForm.GetTables;
var
  LStringStream: TStringStream;
begin
  FDStoredProcGet.ExecProc;
  LStringStream := TStringStream.Create(FDStoredProcGet.Params[0].asBlob);
  try
    if LStringStream <> nil then
    begin
      LStringStream.Position := 0;
      DataModuleFDClient.FDSchemaAdapter.LoadFromStream(LStringStream, TFDStorageFormat.sfBinary);
    end;
  finally
    LStringStream.Free;
  end;
end;

Client 没有获取 Blob 参数上的所有数据。我在Server端保存了Stream的内容,在Client端保存了到达Blob参数的内容,大小一样,但是Blob参数的内容被截断了,最后几Kbytes为0 .

这就是我在服务器上保存将转到流的内容的方式:

FDSchemaAdapter.SaveToFile('C:\Temp\JSON_Server.json', TFDStorageFormat.sfJSON);

这是我检查客户端 blob 参数的方法:

TFile.WriteAllText('C:\Temp\JSON_Client.json', FDStoredProcGet.Params[0].asBlob);

我可以看到客户端截断了数据。

您知道如何修复它,或者将所有流内容从 Datasnap 服务器检索到我的客户端的解决方法吗?

更新:我已经更新到Delphi 10.1 Berlin Update 2,但问题依旧。

谢谢。

在服务器端压缩流,在客户端解压。 Delphi 10.1 提供了必要的 类(System.ZLib.TZCompressionStreamSystem.ZLib.TZDecompressionStream)。联机文档包含一个 example,它展示了如何使用这些例程来压缩和解压缩流中的数据。将输出保存到 ZIP 文件以检查它是否小于 260 KB。

解决方法:运行 为大文件请求提供服务的 HTTP 服务器。代码生成并存储文件,如您的问题所示,returns 其 URL 到客户端:

https://example.com/ds/... -> for the DataSnap service

https://example.com/files/... -> for big files

如果您已经使用 Apache 作为反向代理,您可以将 Apache 配置为将 HTTP GET 请求路由到 /files/ 中的资源。

要获得更多控制(身份验证),您可以 运行 在不同端口上的 HTTP 服务器(基于 Indy)为这些文件的请求提供服务。 Apache 可能被配置为将 HTTP 请求映射到正确的目的地,客户端将只能看到一个 HTTP 端口。

我在西雅图(我没有安装柏林)使用 DataSnap 时遇到了类似的问题 不涉及 FireDAC 的服务器。

在我的 DataSnap 服务器上,我有:

type
  TServerMethods1 = class(TDSServerModule)
  public
    function GetStream(Size: Integer): TStream;
    function GetString(Size: Integer): String;
  end;

[...]

uses System.StrUtils;

function BuildString(Size : Integer) : String;
var
  S : String;
  Count,
  LeftToWrite : Integer;
const
  scBlock = '%8d bytes'#13#10;
begin
  LeftToWrite := Size;
  Count := 1;
  while Count <= Size do begin
    S := Format(scBlock, [Count]);
    if LeftToWrite >= Length(S) then
    else
      S := Copy(S, 1, LeftToWrite);
    Result := Result + S;
    Inc(Count, Length(S));
    Dec(LeftToWrite, Length(S));
  end;
  if Length(Result) > 0 then
    Result[Length(Result)] := '.'
end;

function TServerMethods1.GetStream(Size : Integer): TStream;
var
  SS : TStringStream;
begin
  SS := TStringStream.Create;
  SS.WriteString(BuildString(Size));
  SS.Position := 0;
  OutputDebugString('Quality Suite:TRACING:ON');
  Result := SS;
end;

function TServerMethods1.GetString(Size : Integer): String;
begin
  Result := BuildString(Size);
end;

如您所见,这两个函数都使用以下方法构建指定大小的字符串 相同的 BuildString 函数和 return 它分别作为流和字符串。

在这里的两个 Win10 系统上,GetStream 适用于最大 30716 字节的大小,但 在此之上,它 return 是一个空流,“大小”为 -1。

Otoh,GetString 适用于我测试过的所有尺寸,包括 大小为 32000000。我还没有设法追踪 GetStream 失败的原因。 但是,基于 GetString 确实 工作的观察,我测试了 在 work-around 之后,它将流作为字符串发送,并且运行良好 也为 32M:

function TServerMethods1.GetStreamAsString(Size: Integer): String;
var
  S : TStream;
  SS : TStringStream;
begin
  S := GetStream(Size);
  S.Position := 0;
  SS := TStringStream.Create;
  SS.CopyFrom(S, S.Size);
  SS.Position := 0;
  Result := SS.DataString;
  SS.Free;
  S.Free;
end;

我很感激你可能更喜欢你自己的 work-around 以块的形式发送结果。

顺便说一句,我尝试通过在服务器主窗体的方法中创建 TServerMethods 的实例并直接从中调用 GetStream 来在服务器上调用我的 GetStream,所以不涉及服务器的 TDSTCPServerTransport。这正确 return 流所以问题似乎出在传输层或输入 and/or 输出接口。

我编写了解决方法。看到我不能传递大于 255Kb 的数据,然后我将它分成不同的 255Kb 数据包并分别发送(我还添加了压缩以最小化带宽和往返)。

在服务器上,我已将 StremGet 更改为两个不同的调用:StreamGet 和 StreamGetNextPacket。

function TServerMethods.StreamGet(var Complete: boolean): TStream;
var Data: TMemoryStream;
    Compression: TZCompressionStream;
begin
  try
    // Opening Data
    qCustomers.Close;
    qCustomers.Open;
    qOrders.Close;
    qOrders.Open;

    // Compressing Data
    try
      if Assigned(CommStream) then FreeAndNil(CommStream);
      CommStream := TMemoryStream.Create;
      Data := TMemoryStream.Create;
      Compression := TZCompressionStream.Create(CommStream);
      FDSchemaAdapter.SaveToStream(Data, TFDStorageFormat.sfBinary);
      Data.Position := 0;
      Compression.CopyFrom(Data, Data.Size);
    finally
      Data.Free;
      Compression.Free;
    end;

    // Returning First 260000 bytes Packet
    CommStream.Position := 0;
    Result := TMemoryStream.Create;
    Result.CopyFrom(CommStream, Min(CommStream.Size, 260000));
    Result.Position := 0;

    // Freeing Memory if all sent
    Complete := (CommStream.Position = CommStream.Size);
    if Complete then FreeAndNil(CommStream);
  except
    raise;
  end;
end;

function TServerMethods.StreamGetNextPacket(var Complete: boolean): TStream;
begin
  // Returning the rest of 260000 bytes Packets
  Result := TMemoryStream.Create;
  Result.CopyFrom(CommStream, Min(CommStream.Size - CommStream.Position, 260000));
  Result.Position := 0;

  // Freeing Memory if all sent
  Complete := (CommStream.Position = CommStream.Size);
  if Complete then FreeAndNil(CommStream);
end;

CommStream:TStream 在 TServerMethods 上声明为私有。

客户端以这种方式检索它:

procedure TClientForm.GetTables;
var Complete: boolean;
    Input: TStringStream;
    Data: TMemoryStream;
    Decompression:  TZDecompressionStream;
begin
  Input := nil;
  Data := nil;
  Decompression := nil;

  try
    // Get the First 260000 bytes Packet
    spStreamGet.ExecProc;
    Input := TStringStream.Create(spStreamGet.ParamByName('ReturnValue').AsBlob);
    Complete := spStreamGet.ParamByName('Complete').AsBoolean;

    // Get the rest of 260000 bytes Packets
    while not Complete do begin
      spStreamGetNextPacket.ExecProc;
      Input.Position := Input.Size;
      Input.WriteBuffer(TBytes(spStreamGetNextPacket.ParamByName('ReturnValue').AsBlob), Length(spStreamGetNextPacket.ParamByName('ReturnValue').AsBlob));
      Complete := spStreamGetNextPacket.ParamByName('Complete').AsBoolean;
    end;

    // Decompress Data
    Input.Position := 0;
    Data := TMemoryStream.Create;
    Decompression := TZDecompressionStream.Create(Input);
    Data.CopyFrom(Decompression, 0);
    Data.Position := 0;

    // Load Datasets
    DataModuleFDClient.FDSchemaAdapter.LoadFromStream(Data, TFDStorageFormat.sfBinary);
  finally
    if Assigned(Input) then FreeAndNil(Input);
    if Assigned(Data) then FreeAndNil(Data);
    if Assigned(Decompression) then FreeAndNil(Decompression);
  end;
end;

现在可以正常使用了。

问题似乎既不是 TStream class 也不是底层 DataSnap 通信基础结构,而是 TFDStoredProc 组件创建了 ftBlob 类型的 return 参数。首先,将输出参数从 ftBlob 更改为 ftStream。然后,将 GetTables 过程更改为:

procedure  TClientForm.GetTables;
var
  LStringStream: TStream;
begin
  spStreamGet.ExecProc;
  LStringStream := spStreamGet.Params[0].AsStream;
  LStringStream.Position := 0;
  DataModuleFDClient.FDSchemaAdapter.LoadFromStream(LStringStream, 
  TFDStorageFormat.sfBinary);
end;

@Marc:我认为 Henrikki 的意思是单个函数,而不是单个函数调用...
我已经修改了你的代码,这样只有一个功能就足够了,这样就可以使用具有不同 SchemaAdapters/StoredProcedures 的项目。
最大流大小被声明为常量 (MaxDataSnapStreamSize) 并设置为 $F000,这是 MaxBuffSize 一个 TStream.CopyFrom 函数句柄(参见 System.Classes)。
FComprStream 是 TMemorySTream 类型的私有字段,在服务器模块的构造函数和析构函数中处理。

在服务器端:

const
  MaxDataSnapStreamSize = $F000;

function TServerMethods1.StreamGet(const aFDSchemaAdapter: TFDSchemaAdapter; var aSize: Int64): TStream;
var
  lZIPStream: TZCompressionStream;
  lDataStream: TMemoryStream;
  I: Integer;
  lMinSize: Int64;
begin
if aSize=-1 then
  exit;
lDataStream:=TMemoryStream.Create;
  try
  if aSize=0 then
    begin
    FComprStream.Clear;
    with aFDSchemaAdapter do
      for I := 0 to Count-1 do
        begin
        DataSets[I].Close;
        DataSets[I].Open;
        end;
    lZIPStream := TZCompressionStream.Create(TCompressionLevel.clFastest, FComprStream);
      try
      aFDSchemaAdapter.SaveToStream(lDataStream, TFDStorageFormat.sfBinary);
      lDataStream.Position := 0;
      lZIPStream.CopyFrom(lDataStream, lDataStream.Size);
      finally
      lDataStream.Clear;
      lZIPStream.Free;
      end;
    lMinSize:=Min(FComprStream.Size, MaxDataSnapStreamSize);
    FComprStream.Position:=0;
    end
  else
    lMinSize:=Min(aSize, MaxDataSnapStreamSize);

  lDataStream.CopyFrom(FComprStream, lMinSize);
  lDataStream.Position := 0;
  aSize:=FComprStream.Size-FComprStream.Position;
  Result:=lDataStream;
  if aSize=0 then
    FComprStream.Clear;
  except
  aSize:=-1;
  lDataStream.Free;
  raise;
  end;
end;

在客户端:

procedure TdmClientModuleDS.GetTables(const aStPrGet: TFDStoredProc; const aFDSchemaAdapter: TFDSchemaAdapter);
var
  lSize: Int64;
  lZIPStream: TStringStream;
  lDataStream: TMemoryStream;
  lUNZIPStream:  TZDecompressionStream;
  I: Integer;
begin
  try
  lSize:=0;
  for I := 0 to aFDSchemaAdapter.Count-1 do
    aFDSchemaAdapter.DataSets[I].Close;
  aStPrGet.ParamByName('aSize').AsInteger:=0;
  aStPrGet.ExecProc;
  lZIPStream:=TStringStream.Create(aStPrGet.ParamByName('ReturnValue').AsBlob);
  lSize:=aStPrGet.ParamByName('aSize').AsInteger;
  while lSize>0 do
    with aStPrGet do
      begin
      ParamByName('aSize').AsInteger:=lSize;
      ExecProc;
      lZIPStream.Position:=lZIPStream.Size;
      lZIPStream.WriteBuffer(TBytes(ParamByName('ReturnValue').AsBlob),Length(ParamByName('ReturnValue').AsBlob));
      lSize:=ParamByName('aSize').AsInteger;
      end;
  lZIPStream.Position:=0;
  lDataStream:=TMemoryStream.Create;
  lUNZIPStream:=TZDecompressionStream.Create(lZIPStream);
  lDataStream.CopyFrom(lUNZIPStream, 0);
  lDataStream.Position:=0;
  aFDSchemaAdapter.LoadFromStream(lDataStream,TFDStorageFormat.sfBinary);
  finally
  if Assigned(lZIPStream) then
    FreeAndNil(lZIPStream);
  if Assigned(lDataStream) then
    FreeAndNil(lDataStream);
  if Assigned(lUNZIPStream) then
    FreeAndNil(lUNZIPStream);
  end;
end;