C# export SQL Server large tables in batches/chunks using DataReader 和 CSV writer

C# export SQL Server large tables in batches/chunks using DataReader and CSV writer

我开发了一个 CSV 批处理程序。 但与 BCP 相比,这个过程似乎相当缓慢。 我唯一的要求是将没有标识或主键列的大表导出到多个小型 CSV 文件中,并用相应的批次 ID 命名它们。

BCP 的问题是它只会写入一个大文件。

我目前的流程是: 读取数据并使用 CSV 写入器写入内存流 我一直在检查内存流是否大于特定的批处理大小,然后我将异步地复制内存流写入文本文件。

在没有内存不足的情况下,我可以批量导出 250MB 的文件

但是这个过程比BCP导出要多5倍的时间。

有没有比我现在做的更好的实现批量导出CSV的方法

请指教

我想到了几个选项:

使用 FETCH / OFFSET

如果源查询能够在 SQL 服务器中轻松地进行批处理(例如,您可以关闭的聚集索引),FETCH 和 OFFSET 基本上是免费的。

如果 table 是一个堆,FETCH/OFFSET 不是一个真正的选择,但您可以考虑添加聚集索引,因为没有太多好的理由反对这样做(尽管这样做100 GB table 会很贵 :)

bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 0 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch1.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 20000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch2.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 40000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch3.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 60000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch4.csv -S Server -U sa -P Password -w

使用 SqlDataReader

根据使用约 1.2 GB table 的测量结果,C# CSV SQL 导出(下图)的简单实现在相同 [=46= 上实现了 BCP 性能的 75% 】 和系统。 (它在嵌入逗号、引号和 CRLF 方面也具有 handling the CSV format correctly 的优势)。

static void Main(string[] args)
{
    var con = new SqlConnection(@"Server=(local);Database=Demo;User Id=sa;Password=bar;");
    con.Open();

    var sqr = new SqlCommand("SELECT * FROM dbo.Table", con);

    using (var reader = sqr.ExecuteReader())
    using (var tw = File.CreateText("out.csv"))
    {
        while (reader.Read())
        {
            for (int i = 0; i < reader.FieldCount; i++)
            {
                if (i != 0)
                {
                    tw.Write(',');
                }

                var val = FormatValue(reader[i]);
                if (val == null)
                {
                    // no-op
                }
                else if (val.IndexOfAny(new[] { '"', ',', '\r', '\n' }) >= 0)
                {
                    tw.Write('"');
                    tw.Write(val.Replace("\"", "\"\""));
                    tw.Write('"');
                }
                else
                {
                    tw.Write(val);
                }
            }
            tw.Write("\r\n");
        }
    }
}

private static string FormatValue(object v)
{
    if (v == null)
    {
        return null;
    }
    if (v is DateTime dt)
    {
        return dt.ToString("O");
    }
    if (v is DateTimeOffset dto)
    {
        return dto.ToString("O");
    }
    if (v is byte[] ba)
    {
        var sb = new StringBuilder(2 + ba.Length * 2);
        sb.Append("0x");
        for (int i = 0; i < ba.Length; i++)
        {
            sb.Append(ba[i].ToString("X2"));
        }
        return sb.ToString();
    }
    return v.ToString();
}

性能似乎受到处理如此多字符串分配的 GC 的限制 - 因此,如果需要更高的性能,将其翻译成非 CLR 语言(例如 C++)可能会与 BCP 的性能相匹配。

使用 SSIS

SSIS 可以在一个包中执行所有步骤。确切的步骤可能最好留给另一个问题,但基本上相当于为“文件编号”合成一列并使用平面文件目标。 Bad example of this

使用 SSIS 生成一个大的 CSV,然后拆分它

如果您使用 SSIS(直接或通过 using the Export Data Wizard),您将获得一个可以拆分的符合 RFC 4180 标准的 CSV 文件。拆分此类文件的示例工具是:

class Program
{
    static void Main(string[] args)
    {
        int n = 0;
        using (var src = File.OpenRead("rfc4180_in.csv"))
        using (var dst = new CsvRfc4180SplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))
        {
            src.CopyTo(dst);
        }
    }
}

/// <summary>
/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least 
/// cutAfterPosition bytes long.
/// </summary>
abstract class SplittingWriteStream : Stream
{
    private long _TotalPosition;
    private long CurrentStreamPos;
    private readonly long CutAfterPosition;
    private readonly Func<Stream> StreamCtor;
    private Stream CurrentStream;

    public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
    {
        if (cutAfterPosition < 0L)
        {
            throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));
        }
        this.CutAfterPosition = cutAfterPosition;

        this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));
        this.CurrentStream = createStream();
    }

    protected override void Dispose(bool disposing) => CurrentStream.Dispose();

    public override void Flush() => CurrentStream.Flush();

    public override void Write(byte[] buffer, int offset, int count)
    {
        // ignore count to always exceed cutAfterPosition
        var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);
        if (cutPoint < 0)
        {
            CurrentStream.Write(buffer, offset, count);
        }
        else
        {
            if (cutPoint > 0)
            {
                CurrentStream.Write(buffer, offset, cutPoint);
            }

            try
            {
                CurrentStream.Dispose();
            }
            finally
            {
                CurrentStream = null;
                CurrentStreamPos = 0L;
                CurrentStream = StreamCtor();
            }

            if (cutPoint != count)
            {
                CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);
            }
        }

        CurrentStreamPos += count;
        _TotalPosition += count;
    }

    protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);

    #region Stream Write-only stubs

    public override bool CanRead => false;
    public override bool CanSeek => false;
    public override bool CanWrite => true;
    public override long Length => throw new NotSupportedException();
    public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();

    public override long Position
    {
        get => _TotalPosition;
        set => throw new NotSupportedException();
    }

    #endregion
}

class CsvRfc4180SplittingWriteStream : SplittingWriteStream
{
    public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
        : base(createStream, cutAfterPosition)
    {
    }

    bool inQuotedString;
    bool lastWasQuote;
    protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
    {
        int? cutPoint = null;
        for (int n = 0; n < count; n++)
        {
            var i = n + offset;
            StepState(buffer[i]);

            // check for CRLF if desired and not escaped
            if (getCutPoint && !inQuotedString && cutPoint == null
                && buffer[i] == '\r' && n + 1 < count && buffer[i + 1] == '\n')
            {
                cutPoint = n;
            }
        }

        return cutPoint ?? -1;
    }

    private void StepState(byte v)
    {
        var isQuote = v == '"';
        if (lastWasQuote)
        {
            lastWasQuote = false;

            if (isQuote)
            {
                // Double quotes:
                //  nop
                //  Inside quoted string == literal escape
                //  Outside quoted string == empty string
            }
            else
            {
                // quote with non-quote following == toggle quoted string
                inQuotedString ^= true;
            }
        }
        else
        {
            lastWasQuote = isQuote;
        }
    }
}

使用 BCP,然后即时拆分

如果需要 BCP,并且它对 CSV 的(错误)处理是可以容忍的,它可以写入命名管道流以即时拆分。

class Program
{
    static void Main(string[] args)
    {
        Thread copyThread;
        var pipeId = $"bcp_{Guid.NewGuid():n}";
        // bcp requires read/write pipe
        using (var np = new NamedPipeServerStream(pipeId))
        {
            copyThread = new Thread(_1 =>
            {
                np.WaitForConnection();
                int n = 0;
                // Use CrlfUtf16leSplittingWriteStream with -w (UTF 16 Little Endian)
                // Use CrlfUtf8SplittingWriteStream other (UTF 8 / ANSII / ASCII / OEM)
                using (var dst = new CrlfUtf16leSplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))
                {
                    np.CopyTo(dst);
                }
            });
            copyThread.Name = "Write thread";
            copyThread.IsBackground = true;
            copyThread.Start();

            var bcp = Process.Start(
                @"C:\Program Files\Microsoft SQL Server\Client SDK\ODBC0\Tools\Binn\bcp.exe",
                $@"FWDB.Rx.RxBatches out \.\pipe\{pipeId} -S (local) -U sa -P abc -w -t,");
            bcp.WaitForExit();
        }
        copyThread.Join();
    }
}

class CrlfUtf16leSplittingWriteStream : SplittingWriteStream
{
    public CrlfUtf16leSplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
        : base(createStream, cutAfterPosition)
    {
    }

    protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
    {
        if (getCutPoint)
        {
            for (int n = 0; n < count - 3 /* CR 00 LF 00 */; n++)
            {
                var i = n + offset;
                if (buffer[i] == '\r' && buffer[i + 1] == 0
                    && buffer[i + 2] == '\n' && buffer[i + 3] == 0)
                {
                    // split after CRLF
                    return n + 4;
                }
            }
        }

        return -1;
    }
}

class CrlfUtf8SplittingWriteStream : SplittingWriteStream
{
    public CrlfUtf8SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
        : base(createStream, cutAfterPosition)
    {
    }

    protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
    {
        if (getCutPoint)
        {
            for (int n = 0; n < count - 1 /* CR LF */; n++)
            {
                var i = n + offset;
                if (buffer[i] == '\r' && buffer[i + 1] == '\n')
                {
                    // split after CRLF
                    return n + 2;
                }
            }
        }

        return -1;
    }
}

/// <summary>
/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least 
/// cutAfterPosition bytes long.
/// </summary>
abstract class SplittingWriteStream : Stream
{
    private long _TotalPosition;
    private long CurrentStreamPos;
    private readonly long CutAfterPosition;
    private readonly Func<Stream> StreamCtor;
    private Stream CurrentStream;

    public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
    {
        if (cutAfterPosition < 0L)
        {
            throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));
        }
        this.CutAfterPosition = cutAfterPosition;

        this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));
        this.CurrentStream = createStream();
    }

    protected override void Dispose(bool disposing) => CurrentStream.Dispose();

    public override void Flush() => CurrentStream.Flush();

    public override void Write(byte[] buffer, int offset, int count)
    {
        // ignore count to always exceed cutAfterPosition
        var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);
        if (cutPoint < 0)
        {
            CurrentStream.Write(buffer, offset, count);
        }
        else
        {
            if (cutPoint > 0)
            {
                CurrentStream.Write(buffer, offset, cutPoint);
            }

            try
            {
                CurrentStream.Dispose();
            }
            finally
            {
                CurrentStream = null;
                CurrentStreamPos = 0L;
                CurrentStream = StreamCtor();
            }

            if (cutPoint != count)
            {
                CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);
            }
        }

        CurrentStreamPos += count;
        _TotalPosition += count;
    }

    protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);

    #region Stream Write-only stubs

    public override bool CanRead => false;
    public override bool CanSeek => false;
    public override bool CanWrite => true;
    public override long Length => throw new NotSupportedException();
    public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();

    public override long Position
    {
        get => _TotalPosition;
        set => throw new NotSupportedException();
    }

    #endregion
}

class CsvRfc4180SplittingWriteStream : SplittingWriteStream
{
    public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
        : base(createStream, cutAfterPosition)
    {
    }

    bool inQuotedString;
    bool lastWasQuote;
    protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
    {
        int? cutPoint = null;
        for (int n = 0; n < count; n++)
        {
            var i = n + offset;
            StepState(buffer[i]);

            // check for CRLF if desired and not escaped
            if (getCutPoint && !inQuotedString && cutPoint == null
                && buffer[i] == '\r' && n + 1 < count && buffer[i + 1] == '\n')
            {
                cutPoint = n;
            }
        }

        return cutPoint ?? -1;
    }

    private void StepState(byte v)
    {
        var isQuote = v == '"';
        if (lastWasQuote)
        {
            lastWasQuote = false;

            if (isQuote)
            {
                // Double quotes:
                //  nop
                //  Inside quoted string == literal escape
                //  Outside quoted string == empty string
            }
            else
            {
                // quote with non-quote following == toggle quoted string
                inQuotedString ^= true;
            }
        }
        else
        {
            lastWasQuote = isQuote;
        }
    }
}