每当读取记录时,TransformBlock<TInput, TOutput> 中 InputQueue 的每个元素都会被覆盖

Each element of InputQueue in TransformBlock<TInput, TOutput> is overwritten whenever a record is read

目的

我正在尝试创建一个管道,我一次从文件中读取一个字节记录并将其发送到“BufferBlock”,它将项目附加到缓冲区块中。这通过简单的 LinkTo () 方法链接到 TransformBlock ,它将每个字节记录转换为 MyObject 对象。 下面是执行所有这些操作的整个方法:

    BufferBlock<byte[]> buffer = new BufferBlock<byte[]>();
    TransformBlock<byte[], MyObject> transform = new TransformBlock<byte[], MyObject>(bytes =>
    {
        return FromBytesTOMyObject(bytes);
    });

    private void ReadFileAndAppend()
    {
        buffer.LinkTo(transform, new DataflowLinkOptions() { PropagateCompletion = true });

        BinaryReader br = new BinaryReader(new FileStream("C:\Users\MyUser\myFile.raw",FileMode.Open, FileAccess.Read, FileShare.Read));                                  
        int count;
        byte[] record = new byte[4000];

        // Post more messages to the block.
        Task.Run(async () =>
        {
            while ((count = br.Read(record, 0, record.Length)) != 0)
                await buffer.SendAsync(record);
            buffer.Complete();
        });
        transform.Completion.Wait();
        Console.WriteLine("");

在 TransformBlock 内部调用的方法下方:

static public MyObject FromBytesToMyObject(byte[] record)
    {
        MyObject object = new MyObject();
        object.counter = BitConverter.ToInt32(record, 0);
        object.nPoints = BitConverter.ToInt32(record, 4);

        for (int i = 0; i < object.nPoints; i++)
        {
            int index = i * 4;
            object.A[i] = BitConverter.ToSingle(record, index + 8);
        }
        return object;
    }

从FromBytesToMyObject()方法可以看出,每条读取的记录里面都有一个计数器。 所以每条记录永远不会有一个等于另一条记录的计数器(我也检查了一个字节reader像HxD)

问题

有了这个设置,我认为文件的读取和解释很顺利。但是在读取大约 50 条记录或更多记录后进入调试并在“while”中插入断点,我注意到在 TransformBlock 的 OutputQueue 中 具有相同计数器的记录组排队,因此记录相同。

示例:

仅考虑计数器的精确队列: 1,2,3,4,5,6,7,8,9,10.

我在 OutputQueue 中实际看到的队列:1,2,2,2,3,3,3,3,4,4,5,5 ....

你能告诉我我错在哪里吗?

您一遍又一遍地重复使用相同的 byte[] record,没有任何线程安全考虑。难怪事情进展得如此之快。如果要保证整个操作的正确性,必须每次都使用不同的缓冲区:

while (true)
{
    var record = new byte[4000];
    var count = binaryReader.Read(record, 0, record.Length);
    if (count == 0) break;
    Array.Resize(ref record, count);
    await buffer.SendAsync(record);
}
buffer.Complete();

如果你也关心性能并且不想让垃圾收集器负担过重,你应该看看 ArrayPool class。但要小心,因为这 class 提供了搬起石头砸自己脚的新方法。