C# export SQL Server large tables in batches/chunks using DataReader 和 CSV writer
C# export SQL Server large tables in batches/chunks using DataReader and CSV writer
我开发了一个 CSV 批处理程序。
但与 BCP 相比,这个过程似乎相当缓慢。
我唯一的要求是将没有标识或主键列的大表导出到多个小型 CSV 文件中,并用相应的批次 ID 命名它们。
BCP 的问题是它只会写入一个大文件。
我目前的流程是:
读取数据并使用 CSV 写入器写入内存流
我一直在检查内存流是否大于特定的批处理大小,然后我将异步地复制内存流写入文本文件。
在没有内存不足的情况下,我可以批量导出 250MB 的文件
但是这个过程比BCP导出要多5倍的时间。
有没有比我现在做的更好的实现批量导出CSV的方法
请指教
我想到了几个选项:
使用 FETCH / OFFSET
如果源查询能够在 SQL 服务器中轻松地进行批处理(例如,您可以关闭的聚集索引),FETCH 和 OFFSET 基本上是免费的。
如果 table 是一个堆,FETCH/OFFSET 不是一个真正的选择,但您可以考虑添加聚集索引,因为没有太多好的理由反对这样做(尽管这样做100 GB table 会很贵 :)
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 0 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch1.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 20000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch2.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 40000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch3.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 60000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch4.csv -S Server -U sa -P Password -w
使用 SqlDataReader
根据使用约 1.2 GB table 的测量结果,C# CSV SQL 导出(下图)的简单实现在相同 [=46= 上实现了 BCP 性能的 75% 】 和系统。 (它在嵌入逗号、引号和 CRLF 方面也具有 handling the CSV format correctly 的优势)。
static void Main(string[] args)
{
var con = new SqlConnection(@"Server=(local);Database=Demo;User Id=sa;Password=bar;");
con.Open();
var sqr = new SqlCommand("SELECT * FROM dbo.Table", con);
using (var reader = sqr.ExecuteReader())
using (var tw = File.CreateText("out.csv"))
{
while (reader.Read())
{
for (int i = 0; i < reader.FieldCount; i++)
{
if (i != 0)
{
tw.Write(',');
}
var val = FormatValue(reader[i]);
if (val == null)
{
// no-op
}
else if (val.IndexOfAny(new[] { '"', ',', '\r', '\n' }) >= 0)
{
tw.Write('"');
tw.Write(val.Replace("\"", "\"\""));
tw.Write('"');
}
else
{
tw.Write(val);
}
}
tw.Write("\r\n");
}
}
}
private static string FormatValue(object v)
{
if (v == null)
{
return null;
}
if (v is DateTime dt)
{
return dt.ToString("O");
}
if (v is DateTimeOffset dto)
{
return dto.ToString("O");
}
if (v is byte[] ba)
{
var sb = new StringBuilder(2 + ba.Length * 2);
sb.Append("0x");
for (int i = 0; i < ba.Length; i++)
{
sb.Append(ba[i].ToString("X2"));
}
return sb.ToString();
}
return v.ToString();
}
性能似乎受到处理如此多字符串分配的 GC 的限制 - 因此,如果需要更高的性能,将其翻译成非 CLR 语言(例如 C++)可能会与 BCP 的性能相匹配。
使用 SSIS
SSIS 可以在一个包中执行所有步骤。确切的步骤可能最好留给另一个问题,但基本上相当于为“文件编号”合成一列并使用平面文件目标。 Bad example of this
使用 SSIS 生成一个大的 CSV,然后拆分它
如果您使用 SSIS(直接或通过 using the Export Data Wizard),您将获得一个可以拆分的符合 RFC 4180 标准的 CSV 文件。拆分此类文件的示例工具是:
class Program
{
static void Main(string[] args)
{
int n = 0;
using (var src = File.OpenRead("rfc4180_in.csv"))
using (var dst = new CsvRfc4180SplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))
{
src.CopyTo(dst);
}
}
}
/// <summary>
/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least
/// cutAfterPosition bytes long.
/// </summary>
abstract class SplittingWriteStream : Stream
{
private long _TotalPosition;
private long CurrentStreamPos;
private readonly long CutAfterPosition;
private readonly Func<Stream> StreamCtor;
private Stream CurrentStream;
public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
{
if (cutAfterPosition < 0L)
{
throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));
}
this.CutAfterPosition = cutAfterPosition;
this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));
this.CurrentStream = createStream();
}
protected override void Dispose(bool disposing) => CurrentStream.Dispose();
public override void Flush() => CurrentStream.Flush();
public override void Write(byte[] buffer, int offset, int count)
{
// ignore count to always exceed cutAfterPosition
var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);
if (cutPoint < 0)
{
CurrentStream.Write(buffer, offset, count);
}
else
{
if (cutPoint > 0)
{
CurrentStream.Write(buffer, offset, cutPoint);
}
try
{
CurrentStream.Dispose();
}
finally
{
CurrentStream = null;
CurrentStreamPos = 0L;
CurrentStream = StreamCtor();
}
if (cutPoint != count)
{
CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);
}
}
CurrentStreamPos += count;
_TotalPosition += count;
}
protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);
#region Stream Write-only stubs
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override long Position
{
get => _TotalPosition;
set => throw new NotSupportedException();
}
#endregion
}
class CsvRfc4180SplittingWriteStream : SplittingWriteStream
{
public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
bool inQuotedString;
bool lastWasQuote;
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
int? cutPoint = null;
for (int n = 0; n < count; n++)
{
var i = n + offset;
StepState(buffer[i]);
// check for CRLF if desired and not escaped
if (getCutPoint && !inQuotedString && cutPoint == null
&& buffer[i] == '\r' && n + 1 < count && buffer[i + 1] == '\n')
{
cutPoint = n;
}
}
return cutPoint ?? -1;
}
private void StepState(byte v)
{
var isQuote = v == '"';
if (lastWasQuote)
{
lastWasQuote = false;
if (isQuote)
{
// Double quotes:
// nop
// Inside quoted string == literal escape
// Outside quoted string == empty string
}
else
{
// quote with non-quote following == toggle quoted string
inQuotedString ^= true;
}
}
else
{
lastWasQuote = isQuote;
}
}
}
使用 BCP,然后即时拆分
如果需要 BCP,并且它对 CSV 的(错误)处理是可以容忍的,它可以写入命名管道流以即时拆分。
class Program
{
static void Main(string[] args)
{
Thread copyThread;
var pipeId = $"bcp_{Guid.NewGuid():n}";
// bcp requires read/write pipe
using (var np = new NamedPipeServerStream(pipeId))
{
copyThread = new Thread(_1 =>
{
np.WaitForConnection();
int n = 0;
// Use CrlfUtf16leSplittingWriteStream with -w (UTF 16 Little Endian)
// Use CrlfUtf8SplittingWriteStream other (UTF 8 / ANSII / ASCII / OEM)
using (var dst = new CrlfUtf16leSplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))
{
np.CopyTo(dst);
}
});
copyThread.Name = "Write thread";
copyThread.IsBackground = true;
copyThread.Start();
var bcp = Process.Start(
@"C:\Program Files\Microsoft SQL Server\Client SDK\ODBC0\Tools\Binn\bcp.exe",
$@"FWDB.Rx.RxBatches out \.\pipe\{pipeId} -S (local) -U sa -P abc -w -t,");
bcp.WaitForExit();
}
copyThread.Join();
}
}
class CrlfUtf16leSplittingWriteStream : SplittingWriteStream
{
public CrlfUtf16leSplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
if (getCutPoint)
{
for (int n = 0; n < count - 3 /* CR 00 LF 00 */; n++)
{
var i = n + offset;
if (buffer[i] == '\r' && buffer[i + 1] == 0
&& buffer[i + 2] == '\n' && buffer[i + 3] == 0)
{
// split after CRLF
return n + 4;
}
}
}
return -1;
}
}
class CrlfUtf8SplittingWriteStream : SplittingWriteStream
{
public CrlfUtf8SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
if (getCutPoint)
{
for (int n = 0; n < count - 1 /* CR LF */; n++)
{
var i = n + offset;
if (buffer[i] == '\r' && buffer[i + 1] == '\n')
{
// split after CRLF
return n + 2;
}
}
}
return -1;
}
}
/// <summary>
/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least
/// cutAfterPosition bytes long.
/// </summary>
abstract class SplittingWriteStream : Stream
{
private long _TotalPosition;
private long CurrentStreamPos;
private readonly long CutAfterPosition;
private readonly Func<Stream> StreamCtor;
private Stream CurrentStream;
public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
{
if (cutAfterPosition < 0L)
{
throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));
}
this.CutAfterPosition = cutAfterPosition;
this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));
this.CurrentStream = createStream();
}
protected override void Dispose(bool disposing) => CurrentStream.Dispose();
public override void Flush() => CurrentStream.Flush();
public override void Write(byte[] buffer, int offset, int count)
{
// ignore count to always exceed cutAfterPosition
var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);
if (cutPoint < 0)
{
CurrentStream.Write(buffer, offset, count);
}
else
{
if (cutPoint > 0)
{
CurrentStream.Write(buffer, offset, cutPoint);
}
try
{
CurrentStream.Dispose();
}
finally
{
CurrentStream = null;
CurrentStreamPos = 0L;
CurrentStream = StreamCtor();
}
if (cutPoint != count)
{
CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);
}
}
CurrentStreamPos += count;
_TotalPosition += count;
}
protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);
#region Stream Write-only stubs
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override long Position
{
get => _TotalPosition;
set => throw new NotSupportedException();
}
#endregion
}
class CsvRfc4180SplittingWriteStream : SplittingWriteStream
{
public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
bool inQuotedString;
bool lastWasQuote;
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
int? cutPoint = null;
for (int n = 0; n < count; n++)
{
var i = n + offset;
StepState(buffer[i]);
// check for CRLF if desired and not escaped
if (getCutPoint && !inQuotedString && cutPoint == null
&& buffer[i] == '\r' && n + 1 < count && buffer[i + 1] == '\n')
{
cutPoint = n;
}
}
return cutPoint ?? -1;
}
private void StepState(byte v)
{
var isQuote = v == '"';
if (lastWasQuote)
{
lastWasQuote = false;
if (isQuote)
{
// Double quotes:
// nop
// Inside quoted string == literal escape
// Outside quoted string == empty string
}
else
{
// quote with non-quote following == toggle quoted string
inQuotedString ^= true;
}
}
else
{
lastWasQuote = isQuote;
}
}
}
我开发了一个 CSV 批处理程序。 但与 BCP 相比,这个过程似乎相当缓慢。 我唯一的要求是将没有标识或主键列的大表导出到多个小型 CSV 文件中,并用相应的批次 ID 命名它们。
BCP 的问题是它只会写入一个大文件。
我目前的流程是: 读取数据并使用 CSV 写入器写入内存流 我一直在检查内存流是否大于特定的批处理大小,然后我将异步地复制内存流写入文本文件。
在没有内存不足的情况下,我可以批量导出 250MB 的文件
但是这个过程比BCP导出要多5倍的时间。
有没有比我现在做的更好的实现批量导出CSV的方法
请指教
我想到了几个选项:
使用 FETCH / OFFSET
如果源查询能够在 SQL 服务器中轻松地进行批处理(例如,您可以关闭的聚集索引),FETCH 和 OFFSET 基本上是免费的。
如果 table 是一个堆,FETCH/OFFSET 不是一个真正的选择,但您可以考虑添加聚集索引,因为没有太多好的理由反对这样做(尽管这样做100 GB table 会很贵 :)
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 0 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch1.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 20000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch2.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 40000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch3.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 60000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch4.csv -S Server -U sa -P Password -w
使用 SqlDataReader
根据使用约 1.2 GB table 的测量结果,C# CSV SQL 导出(下图)的简单实现在相同 [=46= 上实现了 BCP 性能的 75% 】 和系统。 (它在嵌入逗号、引号和 CRLF 方面也具有 handling the CSV format correctly 的优势)。
static void Main(string[] args)
{
var con = new SqlConnection(@"Server=(local);Database=Demo;User Id=sa;Password=bar;");
con.Open();
var sqr = new SqlCommand("SELECT * FROM dbo.Table", con);
using (var reader = sqr.ExecuteReader())
using (var tw = File.CreateText("out.csv"))
{
while (reader.Read())
{
for (int i = 0; i < reader.FieldCount; i++)
{
if (i != 0)
{
tw.Write(',');
}
var val = FormatValue(reader[i]);
if (val == null)
{
// no-op
}
else if (val.IndexOfAny(new[] { '"', ',', '\r', '\n' }) >= 0)
{
tw.Write('"');
tw.Write(val.Replace("\"", "\"\""));
tw.Write('"');
}
else
{
tw.Write(val);
}
}
tw.Write("\r\n");
}
}
}
private static string FormatValue(object v)
{
if (v == null)
{
return null;
}
if (v is DateTime dt)
{
return dt.ToString("O");
}
if (v is DateTimeOffset dto)
{
return dto.ToString("O");
}
if (v is byte[] ba)
{
var sb = new StringBuilder(2 + ba.Length * 2);
sb.Append("0x");
for (int i = 0; i < ba.Length; i++)
{
sb.Append(ba[i].ToString("X2"));
}
return sb.ToString();
}
return v.ToString();
}
性能似乎受到处理如此多字符串分配的 GC 的限制 - 因此,如果需要更高的性能,将其翻译成非 CLR 语言(例如 C++)可能会与 BCP 的性能相匹配。
使用 SSIS
SSIS 可以在一个包中执行所有步骤。确切的步骤可能最好留给另一个问题,但基本上相当于为“文件编号”合成一列并使用平面文件目标。 Bad example of this
使用 SSIS 生成一个大的 CSV,然后拆分它
如果您使用 SSIS(直接或通过 using the Export Data Wizard),您将获得一个可以拆分的符合 RFC 4180 标准的 CSV 文件。拆分此类文件的示例工具是:
class Program
{
static void Main(string[] args)
{
int n = 0;
using (var src = File.OpenRead("rfc4180_in.csv"))
using (var dst = new CsvRfc4180SplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))
{
src.CopyTo(dst);
}
}
}
/// <summary>
/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least
/// cutAfterPosition bytes long.
/// </summary>
abstract class SplittingWriteStream : Stream
{
private long _TotalPosition;
private long CurrentStreamPos;
private readonly long CutAfterPosition;
private readonly Func<Stream> StreamCtor;
private Stream CurrentStream;
public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
{
if (cutAfterPosition < 0L)
{
throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));
}
this.CutAfterPosition = cutAfterPosition;
this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));
this.CurrentStream = createStream();
}
protected override void Dispose(bool disposing) => CurrentStream.Dispose();
public override void Flush() => CurrentStream.Flush();
public override void Write(byte[] buffer, int offset, int count)
{
// ignore count to always exceed cutAfterPosition
var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);
if (cutPoint < 0)
{
CurrentStream.Write(buffer, offset, count);
}
else
{
if (cutPoint > 0)
{
CurrentStream.Write(buffer, offset, cutPoint);
}
try
{
CurrentStream.Dispose();
}
finally
{
CurrentStream = null;
CurrentStreamPos = 0L;
CurrentStream = StreamCtor();
}
if (cutPoint != count)
{
CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);
}
}
CurrentStreamPos += count;
_TotalPosition += count;
}
protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);
#region Stream Write-only stubs
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override long Position
{
get => _TotalPosition;
set => throw new NotSupportedException();
}
#endregion
}
class CsvRfc4180SplittingWriteStream : SplittingWriteStream
{
public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
bool inQuotedString;
bool lastWasQuote;
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
int? cutPoint = null;
for (int n = 0; n < count; n++)
{
var i = n + offset;
StepState(buffer[i]);
// check for CRLF if desired and not escaped
if (getCutPoint && !inQuotedString && cutPoint == null
&& buffer[i] == '\r' && n + 1 < count && buffer[i + 1] == '\n')
{
cutPoint = n;
}
}
return cutPoint ?? -1;
}
private void StepState(byte v)
{
var isQuote = v == '"';
if (lastWasQuote)
{
lastWasQuote = false;
if (isQuote)
{
// Double quotes:
// nop
// Inside quoted string == literal escape
// Outside quoted string == empty string
}
else
{
// quote with non-quote following == toggle quoted string
inQuotedString ^= true;
}
}
else
{
lastWasQuote = isQuote;
}
}
}
使用 BCP,然后即时拆分
如果需要 BCP,并且它对 CSV 的(错误)处理是可以容忍的,它可以写入命名管道流以即时拆分。
class Program
{
static void Main(string[] args)
{
Thread copyThread;
var pipeId = $"bcp_{Guid.NewGuid():n}";
// bcp requires read/write pipe
using (var np = new NamedPipeServerStream(pipeId))
{
copyThread = new Thread(_1 =>
{
np.WaitForConnection();
int n = 0;
// Use CrlfUtf16leSplittingWriteStream with -w (UTF 16 Little Endian)
// Use CrlfUtf8SplittingWriteStream other (UTF 8 / ANSII / ASCII / OEM)
using (var dst = new CrlfUtf16leSplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))
{
np.CopyTo(dst);
}
});
copyThread.Name = "Write thread";
copyThread.IsBackground = true;
copyThread.Start();
var bcp = Process.Start(
@"C:\Program Files\Microsoft SQL Server\Client SDK\ODBC0\Tools\Binn\bcp.exe",
$@"FWDB.Rx.RxBatches out \.\pipe\{pipeId} -S (local) -U sa -P abc -w -t,");
bcp.WaitForExit();
}
copyThread.Join();
}
}
class CrlfUtf16leSplittingWriteStream : SplittingWriteStream
{
public CrlfUtf16leSplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
if (getCutPoint)
{
for (int n = 0; n < count - 3 /* CR 00 LF 00 */; n++)
{
var i = n + offset;
if (buffer[i] == '\r' && buffer[i + 1] == 0
&& buffer[i + 2] == '\n' && buffer[i + 3] == 0)
{
// split after CRLF
return n + 4;
}
}
}
return -1;
}
}
class CrlfUtf8SplittingWriteStream : SplittingWriteStream
{
public CrlfUtf8SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
if (getCutPoint)
{
for (int n = 0; n < count - 1 /* CR LF */; n++)
{
var i = n + offset;
if (buffer[i] == '\r' && buffer[i + 1] == '\n')
{
// split after CRLF
return n + 2;
}
}
}
return -1;
}
}
/// <summary>
/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least
/// cutAfterPosition bytes long.
/// </summary>
abstract class SplittingWriteStream : Stream
{
private long _TotalPosition;
private long CurrentStreamPos;
private readonly long CutAfterPosition;
private readonly Func<Stream> StreamCtor;
private Stream CurrentStream;
public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
{
if (cutAfterPosition < 0L)
{
throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));
}
this.CutAfterPosition = cutAfterPosition;
this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));
this.CurrentStream = createStream();
}
protected override void Dispose(bool disposing) => CurrentStream.Dispose();
public override void Flush() => CurrentStream.Flush();
public override void Write(byte[] buffer, int offset, int count)
{
// ignore count to always exceed cutAfterPosition
var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);
if (cutPoint < 0)
{
CurrentStream.Write(buffer, offset, count);
}
else
{
if (cutPoint > 0)
{
CurrentStream.Write(buffer, offset, cutPoint);
}
try
{
CurrentStream.Dispose();
}
finally
{
CurrentStream = null;
CurrentStreamPos = 0L;
CurrentStream = StreamCtor();
}
if (cutPoint != count)
{
CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);
}
}
CurrentStreamPos += count;
_TotalPosition += count;
}
protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);
#region Stream Write-only stubs
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override long Position
{
get => _TotalPosition;
set => throw new NotSupportedException();
}
#endregion
}
class CsvRfc4180SplittingWriteStream : SplittingWriteStream
{
public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}
bool inQuotedString;
bool lastWasQuote;
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
int? cutPoint = null;
for (int n = 0; n < count; n++)
{
var i = n + offset;
StepState(buffer[i]);
// check for CRLF if desired and not escaped
if (getCutPoint && !inQuotedString && cutPoint == null
&& buffer[i] == '\r' && n + 1 < count && buffer[i + 1] == '\n')
{
cutPoint = n;
}
}
return cutPoint ?? -1;
}
private void StepState(byte v)
{
var isQuote = v == '"';
if (lastWasQuote)
{
lastWasQuote = false;
if (isQuote)
{
// Double quotes:
// nop
// Inside quoted string == literal escape
// Outside quoted string == empty string
}
else
{
// quote with non-quote following == toggle quoted string
inQuotedString ^= true;
}
}
else
{
lastWasQuote = isQuote;
}
}
}