WCF 流式传输大量对象

WCF streaming large number objects

我有一个查询数据库的 WCF 服务,return有大量记录。记录太多,服务器内存不足,来不及return。

所以我想在从数据库中获取记录时发回记录,或者一次发回设定的数量。

为了更加清楚,我无法收集提取到服务器集合中的通话记录,因为在我收集完所有记录之前服务器内存不足。我想尝试在一个电话中将它们一个接一个地发回或分块发回。

For example, in chunks:

  1. Fetch first 1000 records
  2. Add to collection
  3. Send collection to client
  4. Clear collection
  5. Fetch next 1000 records, and repeat from step 2

所以我的想法是 Web 服务代码看起来像这样:

Public IEnumerable<Customer> GetAllCustomers()
{
     // Setup Query
     string query = PrepareQuery();

     // Create Connection
     connection = new SqlConnection(ConnectionString);
     connection.Open();

     var sqlcommand = connection.CreateCommand();
     sqlcommand.CommandText = query.ToString();

     // Read Results
     var reader = sqlcommand.ExecuteReader();
     while (reader.Read())
     {
         Customer customer = new Customer();
         foreach (var column in Columns)
         {
             int fieldIndex = reader.GetOrdinal(column);
             object value = reader.GetValue(fieldIndex);
             customer[column.Name] = value;
         }

         yield return customer;
     }
}

我不想考虑分页,因为 SQL 服务器上的 Order By 很慢。

正在寻找在 WCF 中执行此操作的方法

我想你回答了你自己的问题。有两种方法,流式或块式。

您可以在 wcf 中进行流式处理 - 请参阅 https://docs.microsoft.com/en-us/dotnet/framework/wcf/feature-details/large-data-and-streaming

您有一个要写入的流,因此您需要自己处理如何在该流上编码数据,以及如何在客户端对其进行解码。

另一种选择是 chunking/paging。您只需修改您的服务,使其接受例如页码或其他方式来指示需要哪一页。

你做哪一个取决于应用程序,例如多少数据?客户的性质是什么?是否可以使用某些字段进行分页?等等等等

这里是一些伪代码,用于制作可以在服务器端执行此操作的流。它基于此处的示例:https://docs.microsoft.com/en-us/dotnet/framework/wcf/feature-details/how-to-enable-streaming

我不是在为您编写完整的可编译代码,但这是它的要点。

在服务器中:

public Stream GetBigData()
{
    return new BigDataStream();
}

BigDataStream(未实现的方法未显示):

class BigDataStream : Stream
{
    public BigDataStream()
    {
        // open DB connection
        // run your query
        // get a DataReader
    }

    // you need a buffer to encode your data between calls to Read
    List<byte> _encodeBuffer = new List<byte>();

    public override int Read(byte[] buffer, int offset, int count)
    {
        // read from the DataReader and populate the _encodeBuffer
        // until the _encodeBuffer contains at least count bytes
        // (or until there are no more records)
        // for example:

        while (_encodeBuffer.Count < count && _reader.Read())
        {
            // (1)
            // encode the record into a byte array. How to do this?
            // you can read into a class and then use the data 
            // contract serialization for example. If you do this, you
            // will probably find it easier to prepend an integer which
            // specifies the length of the following encoded message. 
            // This will make it easier for the client to deserialize it.

            // (2)
            // append the encoded record bytes (plus any length prefix 
            // etc) to _encodeBuffer
        }

        // remove up to the first count bytes from _encodeBuffer
        // and copy them into buffer at the offset requested

        // return the number of bytes added
    }

    public override void Close()
    {
        // close the reader + db connection
        base.Close();
    }

}

感谢 mikelegg & Reniuz 帮助找到解决方案。我希望我能给他们打勾以获得正确答案,但我担心下一个阅读此问题的开发人员不会完全受益。那么我最终得到了什么。

  1. 设置服务器和客户端的配置文件(遵循link:Large Data and Streaming
  2. 已关注 this solution, can download source code from here

我不得不稍微更改 DBRowStream.DBThreadProc 方法才能工作,所以我 post 源代码:

DBRowStream Class:

    void DBThreadProc(object o)
    {
        SqlConnection con = null;
        SqlCommand com = null;

        try
        {
            con = new System.Data.SqlClient.SqlConnection(/*ConnectionString*/);
            com = new SqlCommand();
            com.Connection = con;
            com.CommandText = PrepareQuery();
            con.Open();
            SqlDataReader reader = com.ExecuteReader();

            int count = 0;

            MemoryStream memStream = memStream1;
            memStreamWriteStatus = 1;
            readyToWriteToMemStream1.WaitOne();

            while (reader.Read())
            {
                // Populate
                Customer customer = new Customer();
                foreach (var column in Columns)
                {
                    int fieldIndex = reader.GetOrdinal(column);
                    object value = reader.GetValue(fieldIndex);
                    customer[column.Name] = value;
                }                   

                // Serialize: I used a custom Serializer 
                // but BinaryFormatter should be fine
                DBDataFormatter.Serialize(memStream, customer);

                count++;

                if (count == PAGESIZE) // const int PAGESIZE = 10000
                {
                    switch (memStreamWriteStatus)
                    {
                        case 1: // done writing to stream 1
                            {
                                memStream1.Position = 0;
                                readyToSendFromMemStream1.Set();
                                // write stream 1 is done...waiting for stream 2 
                                readyToWriteToMemStream2.WaitOne();
                                memStream = memStream2;
                                memStream.Position = 0;
                                memStream.SetLength(0); // Added:To Reset the stream. Else was getting garbage data back
                                memStreamWriteStatus = 2;

                                break;
                            }
                        case 2: // done writing to stream 2
                            {
                                memStream2.Position = 0;
                                readyToSendFromMemStream2.Set();
                                // Write on stream 2 is done...waiting for stream 1
                                readyToWriteToMemStream1.WaitOne();
                                // done waiting for stream 1 
                                memStream = memStream1;
                                memStreamWriteStatus = 1;
                                memStream.Position = 0;
                                memStream.SetLength(0); // Added: Reset the stream. Else was getting garbage data back

                                break;
                            }
                    }
                    count = 0;
                }
            }

            if (count > 0)
            {
                switch (memStreamWriteStatus)
                {
                    case 1: // done writing to stream 1
                        {
                            memStream1.Position = 0;
                            readyToSendFromMemStream1.Set();
                            // END write stream 1 is done...waiting for stream 2 
                            break;
                        }
                    case 2: // done writing to stream 2
                        {
                            memStream2.Position = 0;
                            readyToSendFromMemStream2.Set();
                            // END write stream 2 is done...waiting for stream 1 
                            break;
                        }
                }
            }
            bDoneWriting = true;
            bCanRead = false;
        }
        catch
        {
            throw;
        }
        finally
        {
            if (com != null)
            {
                com.Dispose();
                com = null;
            }
            if (con != null)
            {
                con.Close();
                con.Dispose();
                con = null;
            }
        }
    }

然后是客户端:

private static void TestGetRecordsAndDump()
{
    const string FILE_NAME = "Records.CSV";
    File.Delete(FILE_NAME);
    var file = File.AppendText(FILE_NAME);
    long count = 0;
    try
    {
        ServiceReference1.ServiceClient service = new ServiceReference1.DataServiceClient();
        var stream = service.GetDBRowStream();

        Console.WriteLine("Records Retrieved : ");
        Console.WriteLine("File Size (MB)    : ");

        var canDoLastRead = true;
        while (stream.CanRead && canDoLastRead)
        {
           try
           {
               Customer customer = DBDataFormatter.Deserialize(stream); // Used custom Deserializer, but BinaryFormatter should be fine

               file.Write(customer.ToString());

               count++;
           }
           catch
           {
                canDoLastRead = false; // Bug: stream.CanRead is not set to false at the end of stream, so I do this trick to know if I finished retruning all records.
           }
           finally
           {
               Console.SetCursorPosition("Records Retrieved : ".Length, 0);
               Console.Write(string.Format("{0}               ", count));
               Console.SetCursorPosition("File Size (MB)    : ".Length, 1);
               Console.Write(string.Format("{0:G}             ", file.BaseStream.Length / 1024f / 1024f));     
           }
        }
        finally
        {
            file.Close();
        }
    }
}

有一个bug我好像解决不了,就是stream.CanRead没有设置为false,然后所有的记录都被return编辑了,一直没弄明白为什么,但是至少现在,我可以查询大型数据集和 return 所有记录,而不会出现服务器或客户端 运行 内存不足的情况。