MemoryStream - 数据被截断
MemoryStream - Data getting truncated
原始问题 - CSV 文件太大 (700k) 条记录 - 因此希望从那个大的 CSV 文件创建较小的 CSV 文件。
获得以下代码来剖析文件并创建更小的文件。
private async Task SplitFile(List<CsvRow> rows, string name)
{
var numRows = 30000;
var remainder = rows.Count() % numRows;
var chunks = rows.Count() / numRows;
if (remainder > 0)
{
chunks++;
}
// Iterate rows in chunks
for (var row = 0; row < chunks; row++)
{
// Extract chunks using LINQ
var fileRows = rows
.Skip(row * numRows)
.Take(numRows)
.ToList();
var outputPath = Path.Combine(@"c:\", $"file{row}.txt");
var encoding = new UTF8Encoding(true);
await using var mem = new MemoryStream();
await using var fileWriter = new StreamWriter(outputPath, false, encoding);
await using var writer = new StreamWriter(mem, encoding);
await using var csvBlob = new CsvWriter(writer, CultureInfo.InvariantCulture);
await using var csvFile = new CsvWriter(fileWriter, CultureInfo.InvariantCulture);
await csvFile.WriteRecordsAsync(fileRows);
await csvBlob.WriteRecordsAsync(fileRows);
FileStream file = new FileStream(@$"c:\memfile{row}.txt", FileMode.Create,
FileAccess.Write);
mem.WriteTo(file);
file.Close();
}
}
Blocker - 我正在从 Azure Blob 容器下载原始大文件,并在创建小块后将它们上传回 Blob 容器。为此,我需要在 MemoryStreams 中获取数据。
我创建物理文件只是为了解决内存流问题。更容易调试。
当我 运行 上面的代码 - 小块文件被创建。您会注意到我正在创建两组文件(块)
首先,直接将数据写入文件流,其次,使用我创建的MemoryStream。
我在通过直接写入文件流创建的文件中获得了 30000 条记录,但在第二个文件中我只获得了 29889 条记录。
我尝试了所有方法,但我无法在使用 MemoryStream 时立即获取所有 30000 条记录。
我刷新了流,摆弄了编码但没有任何帮助。我阅读了有关带 BOM 的 UTF8 的信息。看起来很有希望,但又无法解决。
我正在使用 Dot Net Core 3.1
MemoryStream 是否存在已知问题。为什么它会丢失最后几条记录?其余文件相同。
有什么想法吗?
谢谢
正如我在上面评论的那样,解决方法是在复制 MemoryStream
之前在 CsvWriter
上调用 Flush
。问题是 CsvWriter
内部缓冲区中仍有未决数据,在您 Flush
之前不会将其复制到 MemoryStream
。那应该会让事情为你工作。
但是,我对您的场景有更深入的反馈。在处理批次之前,您似乎正在将整个 700K 文件读入 List<CsvRow>
。更好的方法是从 Azure 流式传输 CSV 数据,在您阅读它时将较小的批次发送回 Azure。
在此示例中,我将使用我自己的库 (Sylvan.Data.Csv),但我确信 CsvHelper 提供了类似的功能。
using Sylvan.Data.Csv;
...
string name = "MyMassiveCsv";
TextReader reader = File.OpenText(name + ".csv");
// replace above with however you access your Azure blob streams.
CsvDataReader csv = await CsvDataReader.CreateAsync(reader);
RangeDataReader r;
int i = 0;
do
{
r = new RangeDataReader(csv, 30000);
i++;
using var writer = File.CreateText(name + i + ".csv");
// using var writer = new StreamWriter(CreateAzureBlob("batch" + i));
using var w = CsvDataWriter.Create(writer);
await w.WriteAsync(r);
} while (!r.AtEndOfData);
这样您一次只需要在内存中保存少量 CSV 文件,然后您将立即开始发回批次,而不必下载首先是整个 CSV。
RangeDataReader
是一个 DbDataReader
实现,它包装了 DbDataReader
并限制了它从底层 reader 读取的行数。实现如下:
using System;
using System.Collections;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
class RangeDataReader : DbDataReader
{
readonly DbDataReader reader;
int row = -1;
int count;
public RangeDataReader(DbDataReader dataReader, int count)
{
this.reader = dataReader;
this.count = count;
}
public bool AtEndOfData { get; private set; }
public override async Task<bool> ReadAsync(CancellationToken cancellationToken)
{
if (row < count)
{
row++;
var r = await reader.ReadAsync(cancellationToken);
if (!r)
{
AtEndOfData = r;
}
return r;
}
return false;
}
public override bool Read()
{
if (row < count)
{
row++;
var r = reader.Read();
if (!r)
{
AtEndOfData = r;
}
return r;
}
return false;
}
public override object this[int ordinal] => this.GetValue(ordinal);
public override object this[string name] => this.GetValue(GetOrdinal(name));
public override int Depth => 0;
public override int FieldCount => reader.FieldCount;
public override bool HasRows => reader.HasRows;
public override bool IsClosed => reader.IsClosed;
public override int RecordsAffected => reader.RecordsAffected;
public override bool GetBoolean(int ordinal)
{
return reader.GetBoolean(ordinal);
}
public override byte GetByte(int ordinal)
{
return reader.GetByte(ordinal);
}
public override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
{
return reader.GetBytes(ordinal, dataOffset, buffer, bufferOffset, length);
}
public override char GetChar(int ordinal)
{
return reader.GetChar(ordinal);
}
public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
{
return reader.GetChars(ordinal, dataOffset, buffer, bufferOffset, length);
}
public override string GetDataTypeName(int ordinal)
{
return reader.GetDataTypeName(ordinal);
}
public override DateTime GetDateTime(int ordinal)
{
return reader.GetDateTime(ordinal);
}
public override decimal GetDecimal(int ordinal)
{
return reader.GetDecimal(ordinal);
}
public override double GetDouble(int ordinal)
{
return reader.GetDouble(ordinal);
}
public override IEnumerator GetEnumerator()
{
return new DbEnumerator(this);
}
public override Type GetFieldType(int ordinal)
{
return reader.GetFieldType(ordinal);
}
public override float GetFloat(int ordinal)
{
return reader.GetFloat(ordinal);
}
public override Guid GetGuid(int ordinal)
{
return reader.GetGuid(ordinal);
}
public override short GetInt16(int ordinal)
{
return reader.GetInt16(ordinal);
}
public override int GetInt32(int ordinal)
{
return reader.GetInt32(ordinal);
}
public override long GetInt64(int ordinal)
{
return reader.GetInt64(ordinal);
}
public override string GetName(int ordinal)
{
return reader.GetName(ordinal);
}
public override int GetOrdinal(string name)
{
return reader.GetOrdinal(name);
}
public override string GetString(int ordinal)
{
return reader.GetString(ordinal);
}
public override object GetValue(int ordinal)
{
return reader.GetValue(ordinal);
}
public override int GetValues(object[] values)
{
return reader.GetValues(values);
}
public override bool IsDBNull(int ordinal)
{
return reader.IsDBNull(ordinal);
}
public override bool NextResult()
{
throw new NotSupportedException();
}
}
几乎所有内容都委托给内部数据reader。唯一有趣的位是 Read/ReadAsync
,它限制了它将读取的行数。我还没有彻底测试这段代码,现在看看它,我可能会在它将读取的行数上相差一个。
最后,既然我已经说明了如何对 CSV 数据进行流式处理,也许拆分的需求消失了,您可以简单地对文件进行流式处理而不需要拆分它?很难知道为什么你觉得你需要拆分它。
原始问题 - CSV 文件太大 (700k) 条记录 - 因此希望从那个大的 CSV 文件创建较小的 CSV 文件。
获得以下代码来剖析文件并创建更小的文件。
private async Task SplitFile(List<CsvRow> rows, string name)
{
var numRows = 30000;
var remainder = rows.Count() % numRows;
var chunks = rows.Count() / numRows;
if (remainder > 0)
{
chunks++;
}
// Iterate rows in chunks
for (var row = 0; row < chunks; row++)
{
// Extract chunks using LINQ
var fileRows = rows
.Skip(row * numRows)
.Take(numRows)
.ToList();
var outputPath = Path.Combine(@"c:\", $"file{row}.txt");
var encoding = new UTF8Encoding(true);
await using var mem = new MemoryStream();
await using var fileWriter = new StreamWriter(outputPath, false, encoding);
await using var writer = new StreamWriter(mem, encoding);
await using var csvBlob = new CsvWriter(writer, CultureInfo.InvariantCulture);
await using var csvFile = new CsvWriter(fileWriter, CultureInfo.InvariantCulture);
await csvFile.WriteRecordsAsync(fileRows);
await csvBlob.WriteRecordsAsync(fileRows);
FileStream file = new FileStream(@$"c:\memfile{row}.txt", FileMode.Create,
FileAccess.Write);
mem.WriteTo(file);
file.Close();
}
}
Blocker - 我正在从 Azure Blob 容器下载原始大文件,并在创建小块后将它们上传回 Blob 容器。为此,我需要在 MemoryStreams 中获取数据。
我创建物理文件只是为了解决内存流问题。更容易调试。
当我 运行 上面的代码 - 小块文件被创建。您会注意到我正在创建两组文件(块)
首先,直接将数据写入文件流,其次,使用我创建的MemoryStream。
我在通过直接写入文件流创建的文件中获得了 30000 条记录,但在第二个文件中我只获得了 29889 条记录。
我尝试了所有方法,但我无法在使用 MemoryStream 时立即获取所有 30000 条记录。
我刷新了流,摆弄了编码但没有任何帮助。我阅读了有关带 BOM 的 UTF8 的信息。看起来很有希望,但又无法解决。
我正在使用 Dot Net Core 3.1
MemoryStream 是否存在已知问题。为什么它会丢失最后几条记录?其余文件相同。
有什么想法吗?
谢谢
正如我在上面评论的那样,解决方法是在复制 MemoryStream
之前在 CsvWriter
上调用 Flush
。问题是 CsvWriter
内部缓冲区中仍有未决数据,在您 Flush
之前不会将其复制到 MemoryStream
。那应该会让事情为你工作。
但是,我对您的场景有更深入的反馈。在处理批次之前,您似乎正在将整个 700K 文件读入 List<CsvRow>
。更好的方法是从 Azure 流式传输 CSV 数据,在您阅读它时将较小的批次发送回 Azure。
在此示例中,我将使用我自己的库 (Sylvan.Data.Csv),但我确信 CsvHelper 提供了类似的功能。
using Sylvan.Data.Csv;
...
string name = "MyMassiveCsv";
TextReader reader = File.OpenText(name + ".csv");
// replace above with however you access your Azure blob streams.
CsvDataReader csv = await CsvDataReader.CreateAsync(reader);
RangeDataReader r;
int i = 0;
do
{
r = new RangeDataReader(csv, 30000);
i++;
using var writer = File.CreateText(name + i + ".csv");
// using var writer = new StreamWriter(CreateAzureBlob("batch" + i));
using var w = CsvDataWriter.Create(writer);
await w.WriteAsync(r);
} while (!r.AtEndOfData);
这样您一次只需要在内存中保存少量 CSV 文件,然后您将立即开始发回批次,而不必下载首先是整个 CSV。
RangeDataReader
是一个 DbDataReader
实现,它包装了 DbDataReader
并限制了它从底层 reader 读取的行数。实现如下:
using System;
using System.Collections;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
class RangeDataReader : DbDataReader
{
readonly DbDataReader reader;
int row = -1;
int count;
public RangeDataReader(DbDataReader dataReader, int count)
{
this.reader = dataReader;
this.count = count;
}
public bool AtEndOfData { get; private set; }
public override async Task<bool> ReadAsync(CancellationToken cancellationToken)
{
if (row < count)
{
row++;
var r = await reader.ReadAsync(cancellationToken);
if (!r)
{
AtEndOfData = r;
}
return r;
}
return false;
}
public override bool Read()
{
if (row < count)
{
row++;
var r = reader.Read();
if (!r)
{
AtEndOfData = r;
}
return r;
}
return false;
}
public override object this[int ordinal] => this.GetValue(ordinal);
public override object this[string name] => this.GetValue(GetOrdinal(name));
public override int Depth => 0;
public override int FieldCount => reader.FieldCount;
public override bool HasRows => reader.HasRows;
public override bool IsClosed => reader.IsClosed;
public override int RecordsAffected => reader.RecordsAffected;
public override bool GetBoolean(int ordinal)
{
return reader.GetBoolean(ordinal);
}
public override byte GetByte(int ordinal)
{
return reader.GetByte(ordinal);
}
public override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
{
return reader.GetBytes(ordinal, dataOffset, buffer, bufferOffset, length);
}
public override char GetChar(int ordinal)
{
return reader.GetChar(ordinal);
}
public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
{
return reader.GetChars(ordinal, dataOffset, buffer, bufferOffset, length);
}
public override string GetDataTypeName(int ordinal)
{
return reader.GetDataTypeName(ordinal);
}
public override DateTime GetDateTime(int ordinal)
{
return reader.GetDateTime(ordinal);
}
public override decimal GetDecimal(int ordinal)
{
return reader.GetDecimal(ordinal);
}
public override double GetDouble(int ordinal)
{
return reader.GetDouble(ordinal);
}
public override IEnumerator GetEnumerator()
{
return new DbEnumerator(this);
}
public override Type GetFieldType(int ordinal)
{
return reader.GetFieldType(ordinal);
}
public override float GetFloat(int ordinal)
{
return reader.GetFloat(ordinal);
}
public override Guid GetGuid(int ordinal)
{
return reader.GetGuid(ordinal);
}
public override short GetInt16(int ordinal)
{
return reader.GetInt16(ordinal);
}
public override int GetInt32(int ordinal)
{
return reader.GetInt32(ordinal);
}
public override long GetInt64(int ordinal)
{
return reader.GetInt64(ordinal);
}
public override string GetName(int ordinal)
{
return reader.GetName(ordinal);
}
public override int GetOrdinal(string name)
{
return reader.GetOrdinal(name);
}
public override string GetString(int ordinal)
{
return reader.GetString(ordinal);
}
public override object GetValue(int ordinal)
{
return reader.GetValue(ordinal);
}
public override int GetValues(object[] values)
{
return reader.GetValues(values);
}
public override bool IsDBNull(int ordinal)
{
return reader.IsDBNull(ordinal);
}
public override bool NextResult()
{
throw new NotSupportedException();
}
}
几乎所有内容都委托给内部数据reader。唯一有趣的位是 Read/ReadAsync
,它限制了它将读取的行数。我还没有彻底测试这段代码,现在看看它,我可能会在它将读取的行数上相差一个。
最后,既然我已经说明了如何对 CSV 数据进行流式处理,也许拆分的需求消失了,您可以简单地对文件进行流式处理而不需要拆分它?很难知道为什么你觉得你需要拆分它。