MemoryStream - 数据被截断

MemoryStream - Data getting truncated

原始问题 - CSV 文件太大 (700k) 条记录 - 因此希望从那个大的 CSV 文件创建较小的 CSV 文件。

获得以下代码来剖析文件并创建更小的文件。

 private async Task SplitFile(List<CsvRow> rows, string name)
    {
        var numRows = 30000;

        var remainder = rows.Count() % numRows;
        var chunks = rows.Count() / numRows;

        if (remainder > 0)
        {
            chunks++;
        }

        // Iterate rows in chunks
        for (var row = 0; row < chunks; row++)
        {
            // Extract chunks using LINQ
            var fileRows = rows
                .Skip(row * numRows)
                .Take(numRows)
                .ToList();

            var outputPath = Path.Combine(@"c:\", $"file{row}.txt");
            var encoding = new UTF8Encoding(true);
            
            await using var mem = new MemoryStream();
            await using var fileWriter = new StreamWriter(outputPath, false, encoding);
            await using var writer = new StreamWriter(mem, encoding);
            await using var csvBlob = new CsvWriter(writer, CultureInfo.InvariantCulture);
            await using var csvFile = new CsvWriter(fileWriter, CultureInfo.InvariantCulture);

            await csvFile.WriteRecordsAsync(fileRows);
            await csvBlob.WriteRecordsAsync(fileRows);

            FileStream file = new FileStream(@$"c:\memfile{row}.txt", FileMode.Create,
                FileAccess.Write);
            mem.WriteTo(file);
            file.Close();
        }
    }

Blocker - 我正在从 Azure Blob 容器下载原始大文件,并在创建小块后将它们上传回 Blob 容器。为此,我需要在 MemoryStreams 中获取数据。

我创建物理文件只是为了解决内存流问题。更容易调试。

当我 运行 上面的代码 - 小块文件被创建。您会注意到我正在创建两组文件(块)

首先,直接将数据写入文件流,其次,使用我创建的MemoryStream。

我在通过直接写入文件流创建的文件中获得了 30000 条记录,但在第二个文件中我只获得了 29889 条记录。

我尝试了所有方法,但我无法在使用 MemoryStream 时立即获取所有 30000 条记录。

我刷新了流,摆弄了编码但没有任何帮助。我阅读了有关带 BOM 的 UTF8 的信息。看起来很有希望,但又无法解决。

我正在使用 Dot Net Core 3.1

MemoryStream 是否存在已知问题。为什么它会丢失最后几条记录?其余文件相同。

有什么想法吗?

谢谢

正如我在上面评论的那样,解决方法是在复制 MemoryStream 之前在 CsvWriter 上调用 Flush。问题是 CsvWriter 内部缓冲区中仍有未决数据,在您 Flush 之前不会将其复制到 MemoryStream。那应该会让事情为你工作。

但是,我对您的场景有更深入的反馈。在处理批次之前,您似乎正在将整个 700K 文件读入 List<CsvRow>。更好的方法是从 Azure 流式传输 CSV 数据,在您阅读它时将较小的批次发送回 Azure。

在此示例中,我将使用我自己的库 (Sylvan.Data.Csv),但我确信 CsvHelper 提供了类似的功能。

using Sylvan.Data.Csv;

...

string name = "MyMassiveCsv";
TextReader reader = File.OpenText(name + ".csv");

// replace above with however you access your Azure blob streams.

CsvDataReader csv = await CsvDataReader.CreateAsync(reader);

RangeDataReader r;
int i = 0;
do
{
    r = new RangeDataReader(csv, 30000);
    i++;

    using var writer = File.CreateText(name + i + ".csv");
    // using var writer = new StreamWriter(CreateAzureBlob("batch" + i));
    using var w = CsvDataWriter.Create(writer);

    await w.WriteAsync(r);
} while (!r.AtEndOfData);

这样您一次只需要在内存中保存少量 CSV 文件,然后您将立即开始发回批次,而不必下载首先是整个 CSV。

RangeDataReader 是一个 DbDataReader 实现,它包装了 DbDataReader 并限制了它从底层 reader 读取的行数。实现如下:

using System;
using System.Collections;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;

class RangeDataReader : DbDataReader
{
    readonly DbDataReader reader;

    int row = -1;
    int count;

    public RangeDataReader(DbDataReader dataReader, int count)
    {
        this.reader = dataReader;
        this.count = count;
    }


    public bool AtEndOfData { get; private set; }

    public override async Task<bool> ReadAsync(CancellationToken cancellationToken)
    {
        if (row < count)
        {
            row++;
            var r = await reader.ReadAsync(cancellationToken);
            if (!r)
            {
                AtEndOfData = r;
            }
            return r;
        }
        return false;
    }

    public override bool Read()
    {
        if (row < count)
        {
            row++;
            var r = reader.Read();
            if (!r)
            {
                AtEndOfData = r;
            }
            return r;
        }
        return false;
    }

    public override object this[int ordinal] => this.GetValue(ordinal);

    public override object this[string name] => this.GetValue(GetOrdinal(name));

    public override int Depth => 0;

    public override int FieldCount => reader.FieldCount;

    public override bool HasRows => reader.HasRows;

    public override bool IsClosed => reader.IsClosed;

    public override int RecordsAffected => reader.RecordsAffected;


    public override bool GetBoolean(int ordinal)
    {
        return reader.GetBoolean(ordinal);
    }

    public override byte GetByte(int ordinal)
    {
        return reader.GetByte(ordinal);
    }

    public override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
    {
        return reader.GetBytes(ordinal, dataOffset, buffer, bufferOffset, length);
    }

    public override char GetChar(int ordinal)
    {
        return reader.GetChar(ordinal);
    }

    public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
    {
        return reader.GetChars(ordinal, dataOffset, buffer, bufferOffset, length);
    }

    public override string GetDataTypeName(int ordinal)
    {
        return reader.GetDataTypeName(ordinal);
    }

    public override DateTime GetDateTime(int ordinal)
    {
        return reader.GetDateTime(ordinal);
    }

    public override decimal GetDecimal(int ordinal)
    {
        return reader.GetDecimal(ordinal);
    }

    public override double GetDouble(int ordinal)
    {
        return reader.GetDouble(ordinal);
    }

    public override IEnumerator GetEnumerator()
    {
        return new DbEnumerator(this);
    }

    public override Type GetFieldType(int ordinal)
    {
        return reader.GetFieldType(ordinal);
    }

    public override float GetFloat(int ordinal)
    {
        return reader.GetFloat(ordinal);
    }

    public override Guid GetGuid(int ordinal)
    {
        return reader.GetGuid(ordinal);
    }

    public override short GetInt16(int ordinal)
    {
        return reader.GetInt16(ordinal);
    }

    public override int GetInt32(int ordinal)
    {
        return reader.GetInt32(ordinal);
    }

    public override long GetInt64(int ordinal)
    {
        return reader.GetInt64(ordinal);
    }

    public override string GetName(int ordinal)
    {
        return reader.GetName(ordinal);
    }

    public override int GetOrdinal(string name)
    {
        return reader.GetOrdinal(name);
    }

    public override string GetString(int ordinal)
    {
        return reader.GetString(ordinal);
    }

    public override object GetValue(int ordinal)
    {
        return reader.GetValue(ordinal);
    }

    public override int GetValues(object[] values)
    {
        return reader.GetValues(values);
    }

    public override bool IsDBNull(int ordinal)
    {
        return reader.IsDBNull(ordinal);
    }

    public override bool NextResult()
    {
        throw new NotSupportedException();
    }
}


几乎所有内容都委托给内部数据reader。唯一有趣的位是 Read/ReadAsync,它限制了它将读取的行数。我还没有彻底测试这段代码,现在看看它,我可能会在它将读取的行数上相差一个。

最后,既然我已经说明了如何对 CSV 数据进行流式处理,也许拆分的需求消失了,您可以简单地对文件进行流式处理而不需要拆分它?很难知道为什么你觉得你需要拆分它。