WCF 流式传输大量对象
WCF streaming large number objects
我有一个查询数据库的 WCF 服务,return有大量记录。记录太多,服务器内存不足,来不及return。
所以我想在从数据库中获取记录时发回记录,或者一次发回设定的数量。
为了更加清楚,我无法收集提取到服务器集合中的通话记录,因为在我收集完所有记录之前服务器内存不足。我想尝试在一个电话中将它们一个接一个地发回或分块发回。
For example, in chunks:
- Fetch first 1000 records
- Add to collection
- Send collection to client
- Clear collection
- Fetch next 1000 records, and repeat from step 2
所以我的想法是 Web 服务代码看起来像这样:
Public IEnumerable<Customer> GetAllCustomers()
{
// Setup Query
string query = PrepareQuery();
// Create Connection
connection = new SqlConnection(ConnectionString);
connection.Open();
var sqlcommand = connection.CreateCommand();
sqlcommand.CommandText = query.ToString();
// Read Results
var reader = sqlcommand.ExecuteReader();
while (reader.Read())
{
Customer customer = new Customer();
foreach (var column in Columns)
{
int fieldIndex = reader.GetOrdinal(column);
object value = reader.GetValue(fieldIndex);
customer[column.Name] = value;
}
yield return customer;
}
}
我不想考虑分页,因为 SQL 服务器上的 Order By
很慢。
正在寻找在 WCF 中执行此操作的方法
我想你回答了你自己的问题。有两种方法,流式或块式。
您可以在 wcf 中进行流式处理 - 请参阅 https://docs.microsoft.com/en-us/dotnet/framework/wcf/feature-details/large-data-and-streaming
您有一个要写入的流,因此您需要自己处理如何在该流上编码数据,以及如何在客户端对其进行解码。
另一种选择是 chunking/paging。您只需修改您的服务,使其接受例如页码或其他方式来指示需要哪一页。
你做哪一个取决于应用程序,例如多少数据?客户的性质是什么?是否可以使用某些字段进行分页?等等等等
这里是一些伪代码,用于制作可以在服务器端执行此操作的流。它基于此处的示例:https://docs.microsoft.com/en-us/dotnet/framework/wcf/feature-details/how-to-enable-streaming
我不是在为您编写完整的可编译代码,但这是它的要点。
在服务器中:
public Stream GetBigData()
{
return new BigDataStream();
}
BigDataStream(未实现的方法未显示):
class BigDataStream : Stream
{
public BigDataStream()
{
// open DB connection
// run your query
// get a DataReader
}
// you need a buffer to encode your data between calls to Read
List<byte> _encodeBuffer = new List<byte>();
public override int Read(byte[] buffer, int offset, int count)
{
// read from the DataReader and populate the _encodeBuffer
// until the _encodeBuffer contains at least count bytes
// (or until there are no more records)
// for example:
while (_encodeBuffer.Count < count && _reader.Read())
{
// (1)
// encode the record into a byte array. How to do this?
// you can read into a class and then use the data
// contract serialization for example. If you do this, you
// will probably find it easier to prepend an integer which
// specifies the length of the following encoded message.
// This will make it easier for the client to deserialize it.
// (2)
// append the encoded record bytes (plus any length prefix
// etc) to _encodeBuffer
}
// remove up to the first count bytes from _encodeBuffer
// and copy them into buffer at the offset requested
// return the number of bytes added
}
public override void Close()
{
// close the reader + db connection
base.Close();
}
}
感谢 mikelegg & Reniuz 帮助找到解决方案。我希望我能给他们打勾以获得正确答案,但我担心下一个阅读此问题的开发人员不会完全受益。那么我最终得到了什么。
- 设置服务器和客户端的配置文件(遵循link:Large Data and Streaming)
- 已关注 this solution, can download source code from here
我不得不稍微更改 DBRowStream.DBThreadProc 方法才能工作,所以我 post 源代码:
DBRowStream Class:
void DBThreadProc(object o)
{
SqlConnection con = null;
SqlCommand com = null;
try
{
con = new System.Data.SqlClient.SqlConnection(/*ConnectionString*/);
com = new SqlCommand();
com.Connection = con;
com.CommandText = PrepareQuery();
con.Open();
SqlDataReader reader = com.ExecuteReader();
int count = 0;
MemoryStream memStream = memStream1;
memStreamWriteStatus = 1;
readyToWriteToMemStream1.WaitOne();
while (reader.Read())
{
// Populate
Customer customer = new Customer();
foreach (var column in Columns)
{
int fieldIndex = reader.GetOrdinal(column);
object value = reader.GetValue(fieldIndex);
customer[column.Name] = value;
}
// Serialize: I used a custom Serializer
// but BinaryFormatter should be fine
DBDataFormatter.Serialize(memStream, customer);
count++;
if (count == PAGESIZE) // const int PAGESIZE = 10000
{
switch (memStreamWriteStatus)
{
case 1: // done writing to stream 1
{
memStream1.Position = 0;
readyToSendFromMemStream1.Set();
// write stream 1 is done...waiting for stream 2
readyToWriteToMemStream2.WaitOne();
memStream = memStream2;
memStream.Position = 0;
memStream.SetLength(0); // Added:To Reset the stream. Else was getting garbage data back
memStreamWriteStatus = 2;
break;
}
case 2: // done writing to stream 2
{
memStream2.Position = 0;
readyToSendFromMemStream2.Set();
// Write on stream 2 is done...waiting for stream 1
readyToWriteToMemStream1.WaitOne();
// done waiting for stream 1
memStream = memStream1;
memStreamWriteStatus = 1;
memStream.Position = 0;
memStream.SetLength(0); // Added: Reset the stream. Else was getting garbage data back
break;
}
}
count = 0;
}
}
if (count > 0)
{
switch (memStreamWriteStatus)
{
case 1: // done writing to stream 1
{
memStream1.Position = 0;
readyToSendFromMemStream1.Set();
// END write stream 1 is done...waiting for stream 2
break;
}
case 2: // done writing to stream 2
{
memStream2.Position = 0;
readyToSendFromMemStream2.Set();
// END write stream 2 is done...waiting for stream 1
break;
}
}
}
bDoneWriting = true;
bCanRead = false;
}
catch
{
throw;
}
finally
{
if (com != null)
{
com.Dispose();
com = null;
}
if (con != null)
{
con.Close();
con.Dispose();
con = null;
}
}
}
然后是客户端:
private static void TestGetRecordsAndDump()
{
const string FILE_NAME = "Records.CSV";
File.Delete(FILE_NAME);
var file = File.AppendText(FILE_NAME);
long count = 0;
try
{
ServiceReference1.ServiceClient service = new ServiceReference1.DataServiceClient();
var stream = service.GetDBRowStream();
Console.WriteLine("Records Retrieved : ");
Console.WriteLine("File Size (MB) : ");
var canDoLastRead = true;
while (stream.CanRead && canDoLastRead)
{
try
{
Customer customer = DBDataFormatter.Deserialize(stream); // Used custom Deserializer, but BinaryFormatter should be fine
file.Write(customer.ToString());
count++;
}
catch
{
canDoLastRead = false; // Bug: stream.CanRead is not set to false at the end of stream, so I do this trick to know if I finished retruning all records.
}
finally
{
Console.SetCursorPosition("Records Retrieved : ".Length, 0);
Console.Write(string.Format("{0} ", count));
Console.SetCursorPosition("File Size (MB) : ".Length, 1);
Console.Write(string.Format("{0:G} ", file.BaseStream.Length / 1024f / 1024f));
}
}
finally
{
file.Close();
}
}
}
有一个bug我好像解决不了,就是stream.CanRead
没有设置为false,然后所有的记录都被return编辑了,一直没弄明白为什么,但是至少现在,我可以查询大型数据集和 return 所有记录,而不会出现服务器或客户端 运行 内存不足的情况。
我有一个查询数据库的 WCF 服务,return有大量记录。记录太多,服务器内存不足,来不及return。
所以我想在从数据库中获取记录时发回记录,或者一次发回设定的数量。
为了更加清楚,我无法收集提取到服务器集合中的通话记录,因为在我收集完所有记录之前服务器内存不足。我想尝试在一个电话中将它们一个接一个地发回或分块发回。
For example, in chunks:
- Fetch first 1000 records
- Add to collection
- Send collection to client
- Clear collection
- Fetch next 1000 records, and repeat from step 2
所以我的想法是 Web 服务代码看起来像这样:
Public IEnumerable<Customer> GetAllCustomers()
{
// Setup Query
string query = PrepareQuery();
// Create Connection
connection = new SqlConnection(ConnectionString);
connection.Open();
var sqlcommand = connection.CreateCommand();
sqlcommand.CommandText = query.ToString();
// Read Results
var reader = sqlcommand.ExecuteReader();
while (reader.Read())
{
Customer customer = new Customer();
foreach (var column in Columns)
{
int fieldIndex = reader.GetOrdinal(column);
object value = reader.GetValue(fieldIndex);
customer[column.Name] = value;
}
yield return customer;
}
}
我不想考虑分页,因为 SQL 服务器上的 Order By
很慢。
正在寻找在 WCF 中执行此操作的方法
我想你回答了你自己的问题。有两种方法,流式或块式。
您可以在 wcf 中进行流式处理 - 请参阅 https://docs.microsoft.com/en-us/dotnet/framework/wcf/feature-details/large-data-and-streaming
您有一个要写入的流,因此您需要自己处理如何在该流上编码数据,以及如何在客户端对其进行解码。
另一种选择是 chunking/paging。您只需修改您的服务,使其接受例如页码或其他方式来指示需要哪一页。
你做哪一个取决于应用程序,例如多少数据?客户的性质是什么?是否可以使用某些字段进行分页?等等等等
这里是一些伪代码,用于制作可以在服务器端执行此操作的流。它基于此处的示例:https://docs.microsoft.com/en-us/dotnet/framework/wcf/feature-details/how-to-enable-streaming
我不是在为您编写完整的可编译代码,但这是它的要点。
在服务器中:
public Stream GetBigData()
{
return new BigDataStream();
}
BigDataStream(未实现的方法未显示):
class BigDataStream : Stream
{
public BigDataStream()
{
// open DB connection
// run your query
// get a DataReader
}
// you need a buffer to encode your data between calls to Read
List<byte> _encodeBuffer = new List<byte>();
public override int Read(byte[] buffer, int offset, int count)
{
// read from the DataReader and populate the _encodeBuffer
// until the _encodeBuffer contains at least count bytes
// (or until there are no more records)
// for example:
while (_encodeBuffer.Count < count && _reader.Read())
{
// (1)
// encode the record into a byte array. How to do this?
// you can read into a class and then use the data
// contract serialization for example. If you do this, you
// will probably find it easier to prepend an integer which
// specifies the length of the following encoded message.
// This will make it easier for the client to deserialize it.
// (2)
// append the encoded record bytes (plus any length prefix
// etc) to _encodeBuffer
}
// remove up to the first count bytes from _encodeBuffer
// and copy them into buffer at the offset requested
// return the number of bytes added
}
public override void Close()
{
// close the reader + db connection
base.Close();
}
}
感谢 mikelegg & Reniuz 帮助找到解决方案。我希望我能给他们打勾以获得正确答案,但我担心下一个阅读此问题的开发人员不会完全受益。那么我最终得到了什么。
- 设置服务器和客户端的配置文件(遵循link:Large Data and Streaming)
- 已关注 this solution, can download source code from here
我不得不稍微更改 DBRowStream.DBThreadProc 方法才能工作,所以我 post 源代码:
DBRowStream Class:
void DBThreadProc(object o)
{
SqlConnection con = null;
SqlCommand com = null;
try
{
con = new System.Data.SqlClient.SqlConnection(/*ConnectionString*/);
com = new SqlCommand();
com.Connection = con;
com.CommandText = PrepareQuery();
con.Open();
SqlDataReader reader = com.ExecuteReader();
int count = 0;
MemoryStream memStream = memStream1;
memStreamWriteStatus = 1;
readyToWriteToMemStream1.WaitOne();
while (reader.Read())
{
// Populate
Customer customer = new Customer();
foreach (var column in Columns)
{
int fieldIndex = reader.GetOrdinal(column);
object value = reader.GetValue(fieldIndex);
customer[column.Name] = value;
}
// Serialize: I used a custom Serializer
// but BinaryFormatter should be fine
DBDataFormatter.Serialize(memStream, customer);
count++;
if (count == PAGESIZE) // const int PAGESIZE = 10000
{
switch (memStreamWriteStatus)
{
case 1: // done writing to stream 1
{
memStream1.Position = 0;
readyToSendFromMemStream1.Set();
// write stream 1 is done...waiting for stream 2
readyToWriteToMemStream2.WaitOne();
memStream = memStream2;
memStream.Position = 0;
memStream.SetLength(0); // Added:To Reset the stream. Else was getting garbage data back
memStreamWriteStatus = 2;
break;
}
case 2: // done writing to stream 2
{
memStream2.Position = 0;
readyToSendFromMemStream2.Set();
// Write on stream 2 is done...waiting for stream 1
readyToWriteToMemStream1.WaitOne();
// done waiting for stream 1
memStream = memStream1;
memStreamWriteStatus = 1;
memStream.Position = 0;
memStream.SetLength(0); // Added: Reset the stream. Else was getting garbage data back
break;
}
}
count = 0;
}
}
if (count > 0)
{
switch (memStreamWriteStatus)
{
case 1: // done writing to stream 1
{
memStream1.Position = 0;
readyToSendFromMemStream1.Set();
// END write stream 1 is done...waiting for stream 2
break;
}
case 2: // done writing to stream 2
{
memStream2.Position = 0;
readyToSendFromMemStream2.Set();
// END write stream 2 is done...waiting for stream 1
break;
}
}
}
bDoneWriting = true;
bCanRead = false;
}
catch
{
throw;
}
finally
{
if (com != null)
{
com.Dispose();
com = null;
}
if (con != null)
{
con.Close();
con.Dispose();
con = null;
}
}
}
然后是客户端:
private static void TestGetRecordsAndDump()
{
const string FILE_NAME = "Records.CSV";
File.Delete(FILE_NAME);
var file = File.AppendText(FILE_NAME);
long count = 0;
try
{
ServiceReference1.ServiceClient service = new ServiceReference1.DataServiceClient();
var stream = service.GetDBRowStream();
Console.WriteLine("Records Retrieved : ");
Console.WriteLine("File Size (MB) : ");
var canDoLastRead = true;
while (stream.CanRead && canDoLastRead)
{
try
{
Customer customer = DBDataFormatter.Deserialize(stream); // Used custom Deserializer, but BinaryFormatter should be fine
file.Write(customer.ToString());
count++;
}
catch
{
canDoLastRead = false; // Bug: stream.CanRead is not set to false at the end of stream, so I do this trick to know if I finished retruning all records.
}
finally
{
Console.SetCursorPosition("Records Retrieved : ".Length, 0);
Console.Write(string.Format("{0} ", count));
Console.SetCursorPosition("File Size (MB) : ".Length, 1);
Console.Write(string.Format("{0:G} ", file.BaseStream.Length / 1024f / 1024f));
}
}
finally
{
file.Close();
}
}
}
有一个bug我好像解决不了,就是stream.CanRead
没有设置为false,然后所有的记录都被return编辑了,一直没弄明白为什么,但是至少现在,我可以查询大型数据集和 return 所有记录,而不会出现服务器或客户端 运行 内存不足的情况。