使用 CLR 和 GZIP 压缩行集

Compressing row set using CLR and GZIP

我想压缩一些大的 table,其中包含很少或根本不阅读的历史数据。我首先尝试使用内置压缩(rowpagecolumn storedcolumn-stored archive),但它们都不能压缩行外值(varchar(max),nvarchar(max)) 最后尝试使用 CLR 解决方案。

SQL Server Compressed Rowset Sample 解决方案使用用户定义的 CLR 类型压缩给定查询返回的整个行集。

例如:

CREATE TABLE Archive
(
     [Date] DATETIME2 DEFAULT(GETUTCDATE())
    ,[Data] [dbo].[CompressedRowset]
)

INSERT INTO Archive([Data])
SELECT [dbo].[CompressQueryResults]('SELECT * FROM [dbo].[A]')

它正在运行,但我遇到了以下问题:

这是完整的 .net 代码:

using System;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using Microsoft.SqlServer.Server;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO.Compression;
using System.Xml.Serialization;
using System.Xml;

[Serializable]
[Microsoft.SqlServer.Server.SqlUserDefinedType
    (
        Format.UserDefined
        ,IsByteOrdered = false
        ,IsFixedLength = false
        ,MaxByteSize = -1
    )
]
public struct CompressedRowset : INullable, IBinarySerialize, IXmlSerializable
{
    DataTable rowset;

    public DataTable Data
    {
        get { return this.rowset; }
        set { this.rowset = value; }
    }

    public override string ToString()
    {
        using (var sw = new StringWriter())
        using (var xw = new XmlTextWriter(sw))
        {
            WriteXml(xw);
            xw.Flush();
            sw.Flush();
            return sw.ToString();
        }
    }

    public bool IsNull
    {
        get { return (this.rowset == null);}
    }

    public static CompressedRowset Null
    {
        get
        {
            CompressedRowset h = new CompressedRowset();
            return h;
        }
    }

    public static CompressedRowset Parse(SqlString s)
    {
        using (var sr = new StringReader(s.Value))
        using (var xr = new XmlTextReader(sr))
        {
            var c = new CompressedRowset();
            c.ReadXml(xr);
            return c;
        }
    }


    #region "Stream Wrappers"
    abstract class WrapperStream : Stream
    {
        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return false; }
        }

        public override void Flush()
        {

        }

        public override long Length
        {
            get { throw new NotImplementedException(); }
        }

        public override long Position
        {
            get
            {
                throw new NotImplementedException();
            }
            set
            {
                throw new NotImplementedException();
            }
        }


        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }


    }

    class BinaryWriterStream : WrapperStream
    {
        BinaryWriter br;
        public BinaryWriterStream(BinaryWriter br)
        {
            this.br = br;
        }
        public override bool CanRead
        {
            get { return false; }
        }
        public override bool CanWrite
        {
            get { return true; }
        }
        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }
        public override void Write(byte[] buffer, int offset, int count)
        {
            br.Write(buffer, offset, count);
        }
    }

    class BinaryReaderStream : WrapperStream
    {
        BinaryReader br;
        public BinaryReaderStream(BinaryReader br)
        {
            this.br = br;
        }
        public override bool CanRead
        {
            get { return true; }
        }
        public override bool CanWrite
        {
            get { return false; }
        }
        public override int Read(byte[] buffer, int offset, int count)
        {
            return br.Read(buffer, offset, count);
        }
        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }
    }
    #endregion

    #region "IBinarySerialize"
    public void Read(System.IO.BinaryReader r)
    {
        using (var rs = new BinaryReaderStream(r))
        using (var cs = new GZipStream(rs, CompressionMode.Decompress))
        {
            var ser = new BinaryFormatter();
            this.rowset = (DataTable)ser.Deserialize(cs);
        }
    }
    public void Write(System.IO.BinaryWriter w)
    {
        if (this.IsNull)
            return;

        rowset.RemotingFormat = SerializationFormat.Binary;
        var ser = new BinaryFormatter();
        using (var binaryWriterStream = new BinaryWriterStream(w))
        using (var compressionStream = new GZipStream(binaryWriterStream, CompressionMode.Compress))
        {
            ser.Serialize(compressionStream, rowset);
        }

    }

    #endregion

    /// <summary>
    /// This procedure takes an arbitrary query, runs it and compresses the results into a varbinary(max) blob.
    /// If the query has a large result set, then this procedure will use a large amount of memory to buffer the results in 
    /// a DataTable, and more to copy it into a compressed buffer to return. 
    /// </summary>
    /// <param name="query"></param>
    /// <param name="results"></param>
    //[Microsoft.SqlServer.Server.SqlProcedure]
    [SqlFunction(DataAccess = DataAccessKind.Read, SystemDataAccess = SystemDataAccessKind.Read, IsDeterministic = false, IsPrecise = false)]
    public static CompressedRowset CompressQueryResults(string query)
    {
        //open a context connection
        using (var con = new SqlConnection("Context Connection=true"))
        {
            con.Open();
            var cmd = new SqlCommand(query, con);
            var dt = new DataTable();
            using (var rdr = cmd.ExecuteReader())
            {
                dt.Load(rdr);
            }
            //configure the DataTable for binary serialization
            dt.RemotingFormat = SerializationFormat.Binary;
            var bf = new BinaryFormatter();

            var cdt = new CompressedRowset();
            cdt.rowset = dt;
            return cdt;


        }
    }

    /// <summary>
    /// partial Type mapping between SQL and .NET
    /// </summary>
    /// <param name="t"></param>
    /// <returns></returns>
    static SqlDbType ToSqlType(Type t)
    {
        if (t == typeof(int))
        {
            return SqlDbType.Int;
        }
        if (t == typeof(string))
        {
            return SqlDbType.NVarChar;
        }
        if (t == typeof(Boolean))
        {
            return SqlDbType.Bit;
        }
        if (t == typeof(decimal))
        {
            return SqlDbType.Decimal;
        }
        if (t == typeof(float))
        {
            return SqlDbType.Real;
        }
        if (t == typeof(double))
        {
            return SqlDbType.Float;
        }
        if (t == typeof(DateTime))
        {
            return SqlDbType.DateTime;
        }
        if (t == typeof(Int64))
        {
            return SqlDbType.BigInt;
        }
        if (t == typeof(Int16))
        {
            return SqlDbType.SmallInt;
        }
        if (t == typeof(byte))
        {
            return SqlDbType.TinyInt;
        }
        if ( t == typeof(Guid))
        {
            return SqlDbType.UniqueIdentifier;
        }
        //!!!!!!!!!!!!!!!!!!!
        if (t == typeof(Byte[]))
        {
            return SqlDbType.VarBinary;
        }   
        else
        {
            throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion");
        }

    }

    /// <summary>
    /// This stored procedure takes a compressed DataTable and returns it as a resultset to the clinet
    /// or into a table using exec .... into ...
    /// </summary>
    /// <param name="results"></param>
    [Microsoft.SqlServer.Server.SqlProcedure]
    public static void UnCompressRowset(CompressedRowset results)
    {
        if (results.IsNull)
            return;

        DataTable dt = results.rowset;
        var fields = new SqlMetaData[dt.Columns.Count];
        for (int i = 0; i < dt.Columns.Count; i++)
        {
            var col = dt.Columns[i];
            var sqlType = ToSqlType(col.DataType);
            var colName = col.ColumnName;
            if (sqlType == SqlDbType.NVarChar || sqlType == SqlDbType.VarBinary)
            {
                fields[i] = new SqlMetaData(colName, sqlType, col.MaxLength);
            }
            else
            {
                fields[i] = new SqlMetaData(colName, sqlType);
            }
        }
        var record = new SqlDataRecord(fields);

        SqlContext.Pipe.SendResultsStart(record);
        foreach (DataRow row in dt.Rows)
        {
            record.SetValues(row.ItemArray);
            SqlContext.Pipe.SendResultsRow(record);
        }
        SqlContext.Pipe.SendResultsEnd();

    }

    public System.Xml.Schema.XmlSchema GetSchema()
    {
        return null;
    }

    public void ReadXml(System.Xml.XmlReader reader)
    {
        if (rowset != null)
        {
            throw new InvalidOperationException("rowset already read");
        }
        var ser = new XmlSerializer(typeof(DataTable));
        rowset = (DataTable)ser.Deserialize(reader);
    }

    public void WriteXml(System.Xml.XmlWriter writer)
    {
        if (String.IsNullOrEmpty(rowset.TableName))
            rowset.TableName = "Rows";

        var ser = new XmlSerializer(typeof(DataTable));
        ser.Serialize(writer, rowset);
    }
}

这里是 SQL 对象创建:

CREATE TYPE [dbo].[CompressedRowset]
     EXTERNAL NAME [CompressedRowset].[CompressedRowset];

GO

CREATE FUNCTION [dbo].[CompressQueryResults] (@query [nvarchar](4000))
RETURNS [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[CompressQueryResults];

GO

CREATE PROCEDURE [dbo].[UnCompressRowset] @results [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[UnCompressRowset];

GO

您是否考虑过创建一个新的 'Archive' 数据库(可能设置为简单恢复模式),在其中转储所有旧数据?这可以很容易地在查询中访问,所以没有痛苦,例如

SELECT * FROM archive..olddata

当你创建数据库,将它放在另一个磁盘上,并在你的备份过程中以不同的方式处理它——也许你每周做一次归档过程,然后只需要在那之后备份——在你之后我用 7zip/rar 将它压缩到几乎为零。

不要尝试使用 NTFS 压缩来压缩数据库,SQL 服务器不支持它。

对于最初的问题可能为时已晚,但对于其他绊脚石的人来说,这可能值得考虑:在 SQL Server 2016 中,有压缩和解压缩功能(请参阅 here and here),这可能很有用如果您尝试存档的数据在 [N]VARCHARVARBINARY 列中包含较大的值,请在此处。

您需要将其融入您的业务逻辑层或在 SQL 服务器中进行一些安排,从而将未压缩的 table 作为视图复制到支持 table(压缩值所在的位置)并通过 DECOMPRESS 导出未压缩的数据并使 INSTEAD OF 触发器更新支持 table(因此视图的行为类似于 table 的原始 table 23=] 除了性能差异)。有点 hacky,但它会工作...

对于较旧的 SQL 版本,您也可以编写一个 CLR 函数来完成这项工作。

这种方法显然不适用于由小字段组成的数据集,当然,这种压缩方式在小值上根本无济于事(实际上它会使它们变大)。