使用 CLR 和 GZIP 压缩行集
Compressing row set using CLR and GZIP
我想压缩一些大的 table,其中包含很少或根本不阅读的历史数据。我首先尝试使用内置压缩(row
、page
、column stored
、column-stored archive
),但它们都不能压缩行外值(varchar(max)
,nvarchar(max)
) 最后尝试使用 CLR
解决方案。
SQL Server Compressed Rowset Sample 解决方案使用用户定义的 CLR
类型压缩给定查询返回的整个行集。
例如:
CREATE TABLE Archive
(
[Date] DATETIME2 DEFAULT(GETUTCDATE())
,[Data] [dbo].[CompressedRowset]
)
INSERT INTO Archive([Data])
SELECT [dbo].[CompressQueryResults]('SELECT * FROM [dbo].[A]')
它正在运行,但我遇到了以下问题:
当我尝试压缩一个大的结果行集时,出现以下错误:
Msg 0, Level 11, State 0, Line 0 A severe error occurred on the
current command. The results, if any, should be discarded.
此外,以下语句有效:
SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]')
但这些不是:
INSERT INTO Archive
SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]'
DECLARE @A [dbo].[CompressedRowset]
SELECT @A = [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]')
为了压缩行集,t-sql type
应该映射到 .net type
;不幸的是,并非所有 sql 类型都是如此 - Mapping CLR Parameter Data;我已经扩展了以下函数来处理更多类型,但是如何处理像 geography
这样的类型,例如 :
static SqlDbType ToSqlType(Type t){
if (t == typeof(int)){
return SqlDbType.Int;
}
...
if (t == typeof(Byte[])){
return SqlDbType.VarBinary;
} else {
throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion");
}
}
这是完整的 .net
代码:
using System;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using Microsoft.SqlServer.Server;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO.Compression;
using System.Xml.Serialization;
using System.Xml;
[Serializable]
[Microsoft.SqlServer.Server.SqlUserDefinedType
(
Format.UserDefined
,IsByteOrdered = false
,IsFixedLength = false
,MaxByteSize = -1
)
]
public struct CompressedRowset : INullable, IBinarySerialize, IXmlSerializable
{
DataTable rowset;
public DataTable Data
{
get { return this.rowset; }
set { this.rowset = value; }
}
public override string ToString()
{
using (var sw = new StringWriter())
using (var xw = new XmlTextWriter(sw))
{
WriteXml(xw);
xw.Flush();
sw.Flush();
return sw.ToString();
}
}
public bool IsNull
{
get { return (this.rowset == null);}
}
public static CompressedRowset Null
{
get
{
CompressedRowset h = new CompressedRowset();
return h;
}
}
public static CompressedRowset Parse(SqlString s)
{
using (var sr = new StringReader(s.Value))
using (var xr = new XmlTextReader(sr))
{
var c = new CompressedRowset();
c.ReadXml(xr);
return c;
}
}
#region "Stream Wrappers"
abstract class WrapperStream : Stream
{
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return false; }
}
public override void Flush()
{
}
public override long Length
{
get { throw new NotImplementedException(); }
}
public override long Position
{
get
{
throw new NotImplementedException();
}
set
{
throw new NotImplementedException();
}
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
}
class BinaryWriterStream : WrapperStream
{
BinaryWriter br;
public BinaryWriterStream(BinaryWriter br)
{
this.br = br;
}
public override bool CanRead
{
get { return false; }
}
public override bool CanWrite
{
get { return true; }
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
br.Write(buffer, offset, count);
}
}
class BinaryReaderStream : WrapperStream
{
BinaryReader br;
public BinaryReaderStream(BinaryReader br)
{
this.br = br;
}
public override bool CanRead
{
get { return true; }
}
public override bool CanWrite
{
get { return false; }
}
public override int Read(byte[] buffer, int offset, int count)
{
return br.Read(buffer, offset, count);
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
}
#endregion
#region "IBinarySerialize"
public void Read(System.IO.BinaryReader r)
{
using (var rs = new BinaryReaderStream(r))
using (var cs = new GZipStream(rs, CompressionMode.Decompress))
{
var ser = new BinaryFormatter();
this.rowset = (DataTable)ser.Deserialize(cs);
}
}
public void Write(System.IO.BinaryWriter w)
{
if (this.IsNull)
return;
rowset.RemotingFormat = SerializationFormat.Binary;
var ser = new BinaryFormatter();
using (var binaryWriterStream = new BinaryWriterStream(w))
using (var compressionStream = new GZipStream(binaryWriterStream, CompressionMode.Compress))
{
ser.Serialize(compressionStream, rowset);
}
}
#endregion
/// <summary>
/// This procedure takes an arbitrary query, runs it and compresses the results into a varbinary(max) blob.
/// If the query has a large result set, then this procedure will use a large amount of memory to buffer the results in
/// a DataTable, and more to copy it into a compressed buffer to return.
/// </summary>
/// <param name="query"></param>
/// <param name="results"></param>
//[Microsoft.SqlServer.Server.SqlProcedure]
[SqlFunction(DataAccess = DataAccessKind.Read, SystemDataAccess = SystemDataAccessKind.Read, IsDeterministic = false, IsPrecise = false)]
public static CompressedRowset CompressQueryResults(string query)
{
//open a context connection
using (var con = new SqlConnection("Context Connection=true"))
{
con.Open();
var cmd = new SqlCommand(query, con);
var dt = new DataTable();
using (var rdr = cmd.ExecuteReader())
{
dt.Load(rdr);
}
//configure the DataTable for binary serialization
dt.RemotingFormat = SerializationFormat.Binary;
var bf = new BinaryFormatter();
var cdt = new CompressedRowset();
cdt.rowset = dt;
return cdt;
}
}
/// <summary>
/// partial Type mapping between SQL and .NET
/// </summary>
/// <param name="t"></param>
/// <returns></returns>
static SqlDbType ToSqlType(Type t)
{
if (t == typeof(int))
{
return SqlDbType.Int;
}
if (t == typeof(string))
{
return SqlDbType.NVarChar;
}
if (t == typeof(Boolean))
{
return SqlDbType.Bit;
}
if (t == typeof(decimal))
{
return SqlDbType.Decimal;
}
if (t == typeof(float))
{
return SqlDbType.Real;
}
if (t == typeof(double))
{
return SqlDbType.Float;
}
if (t == typeof(DateTime))
{
return SqlDbType.DateTime;
}
if (t == typeof(Int64))
{
return SqlDbType.BigInt;
}
if (t == typeof(Int16))
{
return SqlDbType.SmallInt;
}
if (t == typeof(byte))
{
return SqlDbType.TinyInt;
}
if ( t == typeof(Guid))
{
return SqlDbType.UniqueIdentifier;
}
//!!!!!!!!!!!!!!!!!!!
if (t == typeof(Byte[]))
{
return SqlDbType.VarBinary;
}
else
{
throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion");
}
}
/// <summary>
/// This stored procedure takes a compressed DataTable and returns it as a resultset to the clinet
/// or into a table using exec .... into ...
/// </summary>
/// <param name="results"></param>
[Microsoft.SqlServer.Server.SqlProcedure]
public static void UnCompressRowset(CompressedRowset results)
{
if (results.IsNull)
return;
DataTable dt = results.rowset;
var fields = new SqlMetaData[dt.Columns.Count];
for (int i = 0; i < dt.Columns.Count; i++)
{
var col = dt.Columns[i];
var sqlType = ToSqlType(col.DataType);
var colName = col.ColumnName;
if (sqlType == SqlDbType.NVarChar || sqlType == SqlDbType.VarBinary)
{
fields[i] = new SqlMetaData(colName, sqlType, col.MaxLength);
}
else
{
fields[i] = new SqlMetaData(colName, sqlType);
}
}
var record = new SqlDataRecord(fields);
SqlContext.Pipe.SendResultsStart(record);
foreach (DataRow row in dt.Rows)
{
record.SetValues(row.ItemArray);
SqlContext.Pipe.SendResultsRow(record);
}
SqlContext.Pipe.SendResultsEnd();
}
public System.Xml.Schema.XmlSchema GetSchema()
{
return null;
}
public void ReadXml(System.Xml.XmlReader reader)
{
if (rowset != null)
{
throw new InvalidOperationException("rowset already read");
}
var ser = new XmlSerializer(typeof(DataTable));
rowset = (DataTable)ser.Deserialize(reader);
}
public void WriteXml(System.Xml.XmlWriter writer)
{
if (String.IsNullOrEmpty(rowset.TableName))
rowset.TableName = "Rows";
var ser = new XmlSerializer(typeof(DataTable));
ser.Serialize(writer, rowset);
}
}
这里是 SQL 对象创建:
CREATE TYPE [dbo].[CompressedRowset]
EXTERNAL NAME [CompressedRowset].[CompressedRowset];
GO
CREATE FUNCTION [dbo].[CompressQueryResults] (@query [nvarchar](4000))
RETURNS [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[CompressQueryResults];
GO
CREATE PROCEDURE [dbo].[UnCompressRowset] @results [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[UnCompressRowset];
GO
您是否考虑过创建一个新的 'Archive' 数据库(可能设置为简单恢复模式),在其中转储所有旧数据?这可以很容易地在查询中访问,所以没有痛苦,例如
SELECT * FROM archive..olddata
当你创建数据库,将它放在另一个磁盘上,并在你的备份过程中以不同的方式处理它——也许你每周做一次归档过程,然后只需要在那之后备份——在你之后我用 7zip/rar 将它压缩到几乎为零。
不要尝试使用 NTFS 压缩来压缩数据库,SQL 服务器不支持它。
对于最初的问题可能为时已晚,但对于其他绊脚石的人来说,这可能值得考虑:在 SQL Server 2016 中,有压缩和解压缩功能(请参阅 here and here),这可能很有用如果您尝试存档的数据在 [N]VARCHAR
和 VARBINARY
列中包含较大的值,请在此处。
您需要将其融入您的业务逻辑层或在 SQL 服务器中进行一些安排,从而将未压缩的 table 作为视图复制到支持 table(压缩值所在的位置)并通过 DECOMPRESS
导出未压缩的数据并使 INSTEAD OF
触发器更新支持 table(因此视图的行为类似于 table 的原始 table 23=] 除了性能差异)。有点 hacky,但它会工作...
对于较旧的 SQL 版本,您也可以编写一个 CLR 函数来完成这项工作。
这种方法显然不适用于由小字段组成的数据集,当然,这种压缩方式在小值上根本无济于事(实际上它会使它们变大)。
我想压缩一些大的 table,其中包含很少或根本不阅读的历史数据。我首先尝试使用内置压缩(row
、page
、column stored
、column-stored archive
),但它们都不能压缩行外值(varchar(max)
,nvarchar(max)
) 最后尝试使用 CLR
解决方案。
SQL Server Compressed Rowset Sample 解决方案使用用户定义的 CLR
类型压缩给定查询返回的整个行集。
例如:
CREATE TABLE Archive
(
[Date] DATETIME2 DEFAULT(GETUTCDATE())
,[Data] [dbo].[CompressedRowset]
)
INSERT INTO Archive([Data])
SELECT [dbo].[CompressQueryResults]('SELECT * FROM [dbo].[A]')
它正在运行,但我遇到了以下问题:
当我尝试压缩一个大的结果行集时,出现以下错误:
Msg 0, Level 11, State 0, Line 0 A severe error occurred on the current command. The results, if any, should be discarded.
此外,以下语句有效:
SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]')
但这些不是:
INSERT INTO Archive SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]' DECLARE @A [dbo].[CompressedRowset] SELECT @A = [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]')
为了压缩行集,
t-sql type
应该映射到.net type
;不幸的是,并非所有 sql 类型都是如此 - Mapping CLR Parameter Data;我已经扩展了以下函数来处理更多类型,但是如何处理像geography
这样的类型,例如 :static SqlDbType ToSqlType(Type t){ if (t == typeof(int)){ return SqlDbType.Int; } ... if (t == typeof(Byte[])){ return SqlDbType.VarBinary; } else { throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion"); } }
这是完整的 .net
代码:
using System;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using Microsoft.SqlServer.Server;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO.Compression;
using System.Xml.Serialization;
using System.Xml;
[Serializable]
[Microsoft.SqlServer.Server.SqlUserDefinedType
(
Format.UserDefined
,IsByteOrdered = false
,IsFixedLength = false
,MaxByteSize = -1
)
]
public struct CompressedRowset : INullable, IBinarySerialize, IXmlSerializable
{
DataTable rowset;
public DataTable Data
{
get { return this.rowset; }
set { this.rowset = value; }
}
public override string ToString()
{
using (var sw = new StringWriter())
using (var xw = new XmlTextWriter(sw))
{
WriteXml(xw);
xw.Flush();
sw.Flush();
return sw.ToString();
}
}
public bool IsNull
{
get { return (this.rowset == null);}
}
public static CompressedRowset Null
{
get
{
CompressedRowset h = new CompressedRowset();
return h;
}
}
public static CompressedRowset Parse(SqlString s)
{
using (var sr = new StringReader(s.Value))
using (var xr = new XmlTextReader(sr))
{
var c = new CompressedRowset();
c.ReadXml(xr);
return c;
}
}
#region "Stream Wrappers"
abstract class WrapperStream : Stream
{
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return false; }
}
public override void Flush()
{
}
public override long Length
{
get { throw new NotImplementedException(); }
}
public override long Position
{
get
{
throw new NotImplementedException();
}
set
{
throw new NotImplementedException();
}
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
}
class BinaryWriterStream : WrapperStream
{
BinaryWriter br;
public BinaryWriterStream(BinaryWriter br)
{
this.br = br;
}
public override bool CanRead
{
get { return false; }
}
public override bool CanWrite
{
get { return true; }
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
br.Write(buffer, offset, count);
}
}
class BinaryReaderStream : WrapperStream
{
BinaryReader br;
public BinaryReaderStream(BinaryReader br)
{
this.br = br;
}
public override bool CanRead
{
get { return true; }
}
public override bool CanWrite
{
get { return false; }
}
public override int Read(byte[] buffer, int offset, int count)
{
return br.Read(buffer, offset, count);
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
}
#endregion
#region "IBinarySerialize"
public void Read(System.IO.BinaryReader r)
{
using (var rs = new BinaryReaderStream(r))
using (var cs = new GZipStream(rs, CompressionMode.Decompress))
{
var ser = new BinaryFormatter();
this.rowset = (DataTable)ser.Deserialize(cs);
}
}
public void Write(System.IO.BinaryWriter w)
{
if (this.IsNull)
return;
rowset.RemotingFormat = SerializationFormat.Binary;
var ser = new BinaryFormatter();
using (var binaryWriterStream = new BinaryWriterStream(w))
using (var compressionStream = new GZipStream(binaryWriterStream, CompressionMode.Compress))
{
ser.Serialize(compressionStream, rowset);
}
}
#endregion
/// <summary>
/// This procedure takes an arbitrary query, runs it and compresses the results into a varbinary(max) blob.
/// If the query has a large result set, then this procedure will use a large amount of memory to buffer the results in
/// a DataTable, and more to copy it into a compressed buffer to return.
/// </summary>
/// <param name="query"></param>
/// <param name="results"></param>
//[Microsoft.SqlServer.Server.SqlProcedure]
[SqlFunction(DataAccess = DataAccessKind.Read, SystemDataAccess = SystemDataAccessKind.Read, IsDeterministic = false, IsPrecise = false)]
public static CompressedRowset CompressQueryResults(string query)
{
//open a context connection
using (var con = new SqlConnection("Context Connection=true"))
{
con.Open();
var cmd = new SqlCommand(query, con);
var dt = new DataTable();
using (var rdr = cmd.ExecuteReader())
{
dt.Load(rdr);
}
//configure the DataTable for binary serialization
dt.RemotingFormat = SerializationFormat.Binary;
var bf = new BinaryFormatter();
var cdt = new CompressedRowset();
cdt.rowset = dt;
return cdt;
}
}
/// <summary>
/// partial Type mapping between SQL and .NET
/// </summary>
/// <param name="t"></param>
/// <returns></returns>
static SqlDbType ToSqlType(Type t)
{
if (t == typeof(int))
{
return SqlDbType.Int;
}
if (t == typeof(string))
{
return SqlDbType.NVarChar;
}
if (t == typeof(Boolean))
{
return SqlDbType.Bit;
}
if (t == typeof(decimal))
{
return SqlDbType.Decimal;
}
if (t == typeof(float))
{
return SqlDbType.Real;
}
if (t == typeof(double))
{
return SqlDbType.Float;
}
if (t == typeof(DateTime))
{
return SqlDbType.DateTime;
}
if (t == typeof(Int64))
{
return SqlDbType.BigInt;
}
if (t == typeof(Int16))
{
return SqlDbType.SmallInt;
}
if (t == typeof(byte))
{
return SqlDbType.TinyInt;
}
if ( t == typeof(Guid))
{
return SqlDbType.UniqueIdentifier;
}
//!!!!!!!!!!!!!!!!!!!
if (t == typeof(Byte[]))
{
return SqlDbType.VarBinary;
}
else
{
throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion");
}
}
/// <summary>
/// This stored procedure takes a compressed DataTable and returns it as a resultset to the clinet
/// or into a table using exec .... into ...
/// </summary>
/// <param name="results"></param>
[Microsoft.SqlServer.Server.SqlProcedure]
public static void UnCompressRowset(CompressedRowset results)
{
if (results.IsNull)
return;
DataTable dt = results.rowset;
var fields = new SqlMetaData[dt.Columns.Count];
for (int i = 0; i < dt.Columns.Count; i++)
{
var col = dt.Columns[i];
var sqlType = ToSqlType(col.DataType);
var colName = col.ColumnName;
if (sqlType == SqlDbType.NVarChar || sqlType == SqlDbType.VarBinary)
{
fields[i] = new SqlMetaData(colName, sqlType, col.MaxLength);
}
else
{
fields[i] = new SqlMetaData(colName, sqlType);
}
}
var record = new SqlDataRecord(fields);
SqlContext.Pipe.SendResultsStart(record);
foreach (DataRow row in dt.Rows)
{
record.SetValues(row.ItemArray);
SqlContext.Pipe.SendResultsRow(record);
}
SqlContext.Pipe.SendResultsEnd();
}
public System.Xml.Schema.XmlSchema GetSchema()
{
return null;
}
public void ReadXml(System.Xml.XmlReader reader)
{
if (rowset != null)
{
throw new InvalidOperationException("rowset already read");
}
var ser = new XmlSerializer(typeof(DataTable));
rowset = (DataTable)ser.Deserialize(reader);
}
public void WriteXml(System.Xml.XmlWriter writer)
{
if (String.IsNullOrEmpty(rowset.TableName))
rowset.TableName = "Rows";
var ser = new XmlSerializer(typeof(DataTable));
ser.Serialize(writer, rowset);
}
}
这里是 SQL 对象创建:
CREATE TYPE [dbo].[CompressedRowset]
EXTERNAL NAME [CompressedRowset].[CompressedRowset];
GO
CREATE FUNCTION [dbo].[CompressQueryResults] (@query [nvarchar](4000))
RETURNS [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[CompressQueryResults];
GO
CREATE PROCEDURE [dbo].[UnCompressRowset] @results [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[UnCompressRowset];
GO
您是否考虑过创建一个新的 'Archive' 数据库(可能设置为简单恢复模式),在其中转储所有旧数据?这可以很容易地在查询中访问,所以没有痛苦,例如
SELECT * FROM archive..olddata
当你创建数据库,将它放在另一个磁盘上,并在你的备份过程中以不同的方式处理它——也许你每周做一次归档过程,然后只需要在那之后备份——在你之后我用 7zip/rar 将它压缩到几乎为零。
不要尝试使用 NTFS 压缩来压缩数据库,SQL 服务器不支持它。
对于最初的问题可能为时已晚,但对于其他绊脚石的人来说,这可能值得考虑:在 SQL Server 2016 中,有压缩和解压缩功能(请参阅 here and here),这可能很有用如果您尝试存档的数据在 [N]VARCHAR
和 VARBINARY
列中包含较大的值,请在此处。
您需要将其融入您的业务逻辑层或在 SQL 服务器中进行一些安排,从而将未压缩的 table 作为视图复制到支持 table(压缩值所在的位置)并通过 DECOMPRESS
导出未压缩的数据并使 INSTEAD OF
触发器更新支持 table(因此视图的行为类似于 table 的原始 table 23=] 除了性能差异)。有点 hacky,但它会工作...
对于较旧的 SQL 版本,您也可以编写一个 CLR 函数来完成这项工作。
这种方法显然不适用于由小字段组成的数据集,当然,这种压缩方式在小值上根本无济于事(实际上它会使它们变大)。