使用 CLR 和 GZIP 压缩行集

Compressing row set using CLR and GZIP

我想压缩一些大的 table,其中包含很少或根本不阅读的历史数据。我首先尝试使用内置压缩(rowpagecolumn storedcolumn-stored archive),但它们都不能压缩行外值(varchar(max),nvarchar(max)) 最后尝试使用 CLR 解决方案。

SQL Server Compressed Rowset Sample 解决方案使用用户定义的 CLR 类型压缩给定查询返回的整个行集。


    ,[Data] [dbo].[CompressedRowset]

INSERT INTO Archive([Data])
SELECT [dbo].[CompressQueryResults]('SELECT * FROM [dbo].[A]')


这是完整的 .net 代码:

using System;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using Microsoft.SqlServer.Server;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO.Compression;
using System.Xml.Serialization;
using System.Xml;

        ,IsByteOrdered = false
        ,IsFixedLength = false
        ,MaxByteSize = -1
public struct CompressedRowset : INullable, IBinarySerialize, IXmlSerializable
    DataTable rowset;

    public DataTable Data
        get { return this.rowset; }
        set { this.rowset = value; }

    public override string ToString()
        using (var sw = new StringWriter())
        using (var xw = new XmlTextWriter(sw))
            return sw.ToString();

    public bool IsNull
        get { return (this.rowset == null);}

    public static CompressedRowset Null
            CompressedRowset h = new CompressedRowset();
            return h;

    public static CompressedRowset Parse(SqlString s)
        using (var sr = new StringReader(s.Value))
        using (var xr = new XmlTextReader(sr))
            var c = new CompressedRowset();
            return c;

    #region "Stream Wrappers"
    abstract class WrapperStream : Stream
        public override bool CanSeek
            get { return false; }

        public override bool CanWrite
            get { return false; }

        public override void Flush()


        public override long Length
            get { throw new NotImplementedException(); }

        public override long Position
                throw new NotImplementedException();
                throw new NotImplementedException();

        public override long Seek(long offset, SeekOrigin origin)
            throw new NotImplementedException();

        public override void SetLength(long value)
            throw new NotImplementedException();


    class BinaryWriterStream : WrapperStream
        BinaryWriter br;
        public BinaryWriterStream(BinaryWriter br)
            this.br = br;
        public override bool CanRead
            get { return false; }
        public override bool CanWrite
            get { return true; }
        public override int Read(byte[] buffer, int offset, int count)
            throw new NotImplementedException();
        public override void Write(byte[] buffer, int offset, int count)
            br.Write(buffer, offset, count);

    class BinaryReaderStream : WrapperStream
        BinaryReader br;
        public BinaryReaderStream(BinaryReader br)
            this.br = br;
        public override bool CanRead
            get { return true; }
        public override bool CanWrite
            get { return false; }
        public override int Read(byte[] buffer, int offset, int count)
            return br.Read(buffer, offset, count);
        public override void Write(byte[] buffer, int offset, int count)
            throw new NotImplementedException();

    #region "IBinarySerialize"
    public void Read(System.IO.BinaryReader r)
        using (var rs = new BinaryReaderStream(r))
        using (var cs = new GZipStream(rs, CompressionMode.Decompress))
            var ser = new BinaryFormatter();
            this.rowset = (DataTable)ser.Deserialize(cs);
    public void Write(System.IO.BinaryWriter w)
        if (this.IsNull)

        rowset.RemotingFormat = SerializationFormat.Binary;
        var ser = new BinaryFormatter();
        using (var binaryWriterStream = new BinaryWriterStream(w))
        using (var compressionStream = new GZipStream(binaryWriterStream, CompressionMode.Compress))
            ser.Serialize(compressionStream, rowset);



    /// <summary>
    /// This procedure takes an arbitrary query, runs it and compresses the results into a varbinary(max) blob.
    /// If the query has a large result set, then this procedure will use a large amount of memory to buffer the results in 
    /// a DataTable, and more to copy it into a compressed buffer to return. 
    /// </summary>
    /// <param name="query"></param>
    /// <param name="results"></param>
    [SqlFunction(DataAccess = DataAccessKind.Read, SystemDataAccess = SystemDataAccessKind.Read, IsDeterministic = false, IsPrecise = false)]
    public static CompressedRowset CompressQueryResults(string query)
        //open a context connection
        using (var con = new SqlConnection("Context Connection=true"))
            var cmd = new SqlCommand(query, con);
            var dt = new DataTable();
            using (var rdr = cmd.ExecuteReader())
            //configure the DataTable for binary serialization
            dt.RemotingFormat = SerializationFormat.Binary;
            var bf = new BinaryFormatter();

            var cdt = new CompressedRowset();
            cdt.rowset = dt;
            return cdt;


    /// <summary>
    /// partial Type mapping between SQL and .NET
    /// </summary>
    /// <param name="t"></param>
    /// <returns></returns>
    static SqlDbType ToSqlType(Type t)
        if (t == typeof(int))
            return SqlDbType.Int;
        if (t == typeof(string))
            return SqlDbType.NVarChar;
        if (t == typeof(Boolean))
            return SqlDbType.Bit;
        if (t == typeof(decimal))
            return SqlDbType.Decimal;
        if (t == typeof(float))
            return SqlDbType.Real;
        if (t == typeof(double))
            return SqlDbType.Float;
        if (t == typeof(DateTime))
            return SqlDbType.DateTime;
        if (t == typeof(Int64))
            return SqlDbType.BigInt;
        if (t == typeof(Int16))
            return SqlDbType.SmallInt;
        if (t == typeof(byte))
            return SqlDbType.TinyInt;
        if ( t == typeof(Guid))
            return SqlDbType.UniqueIdentifier;
        if (t == typeof(Byte[]))
            return SqlDbType.VarBinary;
            throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion");


    /// <summary>
    /// This stored procedure takes a compressed DataTable and returns it as a resultset to the clinet
    /// or into a table using exec .... into ...
    /// </summary>
    /// <param name="results"></param>
    public static void UnCompressRowset(CompressedRowset results)
        if (results.IsNull)

        DataTable dt = results.rowset;
        var fields = new SqlMetaData[dt.Columns.Count];
        for (int i = 0; i < dt.Columns.Count; i++)
            var col = dt.Columns[i];
            var sqlType = ToSqlType(col.DataType);
            var colName = col.ColumnName;
            if (sqlType == SqlDbType.NVarChar || sqlType == SqlDbType.VarBinary)
                fields[i] = new SqlMetaData(colName, sqlType, col.MaxLength);
                fields[i] = new SqlMetaData(colName, sqlType);
        var record = new SqlDataRecord(fields);

        foreach (DataRow row in dt.Rows)


    public System.Xml.Schema.XmlSchema GetSchema()
        return null;

    public void ReadXml(System.Xml.XmlReader reader)
        if (rowset != null)
            throw new InvalidOperationException("rowset already read");
        var ser = new XmlSerializer(typeof(DataTable));
        rowset = (DataTable)ser.Deserialize(reader);

    public void WriteXml(System.Xml.XmlWriter writer)
        if (String.IsNullOrEmpty(rowset.TableName))
            rowset.TableName = "Rows";

        var ser = new XmlSerializer(typeof(DataTable));
        ser.Serialize(writer, rowset);

这里是 SQL 对象创建:

CREATE TYPE [dbo].[CompressedRowset]
     EXTERNAL NAME [CompressedRowset].[CompressedRowset];


CREATE FUNCTION [dbo].[CompressQueryResults] (@query [nvarchar](4000))
RETURNS [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[CompressQueryResults];


CREATE PROCEDURE [dbo].[UnCompressRowset] @results [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[UnCompressRowset];


您是否考虑过创建一个新的 'Archive' 数据库(可能设置为简单恢复模式),在其中转储所有旧数据?这可以很容易地在查询中访问,所以没有痛苦,例如

SELECT * FROM archive..olddata

当你创建数据库,将它放在另一个磁盘上,并在你的备份过程中以不同的方式处理它——也许你每周做一次归档过程,然后只需要在那之后备份——在你之后我用 7zip/rar 将它压缩到几乎为零。

不要尝试使用 NTFS 压缩来压缩数据库,SQL 服务器不支持它。

对于最初的问题可能为时已晚,但对于其他绊脚石的人来说,这可能值得考虑:在 SQL Server 2016 中,有压缩和解压缩功能(请参阅 here and here),这可能很有用如果您尝试存档的数据在 [N]VARCHARVARBINARY 列中包含较大的值,请在此处。

您需要将其融入您的业务逻辑层或在 SQL 服务器中进行一些安排,从而将未压缩的 table 作为视图复制到支持 table(压缩值所在的位置)并通过 DECOMPRESS 导出未压缩的数据并使 INSTEAD OF 触发器更新支持 table(因此视图的行为类似于 table 的原始 table 23=] 除了性能差异)。有点 hacky,但它会工作...

对于较旧的 SQL 版本,您也可以编写一个 CLR 函数来完成这项工作。
