每当读取记录时,TransformBlock<TInput, TOutput> 中 InputQueue 的每个元素都会被覆盖
Each element of InputQueue in TransformBlock<TInput, TOutput> is overwritten whenever a record is read
目的
我正在尝试创建一个管道,我一次从文件中读取一个字节记录并将其发送到“BufferBlock”,它将项目附加到缓冲区块中。这通过简单的 LinkTo () 方法链接到 TransformBlock ,它将每个字节记录转换为 MyObject 对象。
下面是执行所有这些操作的整个方法:
BufferBlock<byte[]> buffer = new BufferBlock<byte[]>();
TransformBlock<byte[], MyObject> transform = new TransformBlock<byte[], MyObject>(bytes =>
{
return FromBytesTOMyObject(bytes);
});
private void ReadFileAndAppend()
{
buffer.LinkTo(transform, new DataflowLinkOptions() { PropagateCompletion = true });
BinaryReader br = new BinaryReader(new FileStream("C:\Users\MyUser\myFile.raw",FileMode.Open, FileAccess.Read, FileShare.Read));
int count;
byte[] record = new byte[4000];
// Post more messages to the block.
Task.Run(async () =>
{
while ((count = br.Read(record, 0, record.Length)) != 0)
await buffer.SendAsync(record);
buffer.Complete();
});
transform.Completion.Wait();
Console.WriteLine("");
在 TransformBlock 内部调用的方法下方:
static public MyObject FromBytesToMyObject(byte[] record)
{
MyObject object = new MyObject();
object.counter = BitConverter.ToInt32(record, 0);
object.nPoints = BitConverter.ToInt32(record, 4);
for (int i = 0; i < object.nPoints; i++)
{
int index = i * 4;
object.A[i] = BitConverter.ToSingle(record, index + 8);
}
return object;
}
从FromBytesToMyObject()方法可以看出,每条读取的记录里面都有一个计数器。 所以每条记录永远不会有一个等于另一条记录的计数器(我也检查了一个字节reader像HxD)
问题
有了这个设置,我认为文件的读取和解释很顺利。但是在读取大约 50 条记录或更多记录后进入调试并在“while”中插入断点,我注意到在 TransformBlock 的 OutputQueue 中
具有相同计数器的记录组排队,因此记录相同。
示例:
仅考虑计数器的精确队列:
1,2,3,4,5,6,7,8,9,10.
我在 OutputQueue 中实际看到的队列:1,2,2,2,3,3,3,3,4,4,5,5 ....
你能告诉我我错在哪里吗?
您一遍又一遍地重复使用相同的 byte[] record
,没有任何线程安全考虑。难怪事情进展得如此之快。如果要保证整个操作的正确性,必须每次都使用不同的缓冲区:
while (true)
{
var record = new byte[4000];
var count = binaryReader.Read(record, 0, record.Length);
if (count == 0) break;
Array.Resize(ref record, count);
await buffer.SendAsync(record);
}
buffer.Complete();
如果你也关心性能并且不想让垃圾收集器负担过重,你应该看看 ArrayPool
class。但要小心,因为这 class 提供了搬起石头砸自己脚的新方法。
目的
我正在尝试创建一个管道,我一次从文件中读取一个字节记录并将其发送到“BufferBlock”,它将项目附加到缓冲区块中。这通过简单的 LinkTo () 方法链接到 TransformBlock
BufferBlock<byte[]> buffer = new BufferBlock<byte[]>();
TransformBlock<byte[], MyObject> transform = new TransformBlock<byte[], MyObject>(bytes =>
{
return FromBytesTOMyObject(bytes);
});
private void ReadFileAndAppend()
{
buffer.LinkTo(transform, new DataflowLinkOptions() { PropagateCompletion = true });
BinaryReader br = new BinaryReader(new FileStream("C:\Users\MyUser\myFile.raw",FileMode.Open, FileAccess.Read, FileShare.Read));
int count;
byte[] record = new byte[4000];
// Post more messages to the block.
Task.Run(async () =>
{
while ((count = br.Read(record, 0, record.Length)) != 0)
await buffer.SendAsync(record);
buffer.Complete();
});
transform.Completion.Wait();
Console.WriteLine("");
在 TransformBlock 内部调用的方法下方:
static public MyObject FromBytesToMyObject(byte[] record)
{
MyObject object = new MyObject();
object.counter = BitConverter.ToInt32(record, 0);
object.nPoints = BitConverter.ToInt32(record, 4);
for (int i = 0; i < object.nPoints; i++)
{
int index = i * 4;
object.A[i] = BitConverter.ToSingle(record, index + 8);
}
return object;
}
从FromBytesToMyObject()方法可以看出,每条读取的记录里面都有一个计数器。 所以每条记录永远不会有一个等于另一条记录的计数器(我也检查了一个字节reader像HxD)
问题
有了这个设置,我认为文件的读取和解释很顺利。但是在读取大约 50 条记录或更多记录后进入调试并在“while”中插入断点,我注意到在 TransformBlock 的 OutputQueue 中 具有相同计数器的记录组排队,因此记录相同。
示例:
仅考虑计数器的精确队列: 1,2,3,4,5,6,7,8,9,10.
我在 OutputQueue 中实际看到的队列:1,2,2,2,3,3,3,3,4,4,5,5 ....
你能告诉我我错在哪里吗?
您一遍又一遍地重复使用相同的 byte[] record
,没有任何线程安全考虑。难怪事情进展得如此之快。如果要保证整个操作的正确性,必须每次都使用不同的缓冲区:
while (true)
{
var record = new byte[4000];
var count = binaryReader.Read(record, 0, record.Length);
if (count == 0) break;
Array.Resize(ref record, count);
await buffer.SendAsync(record);
}
buffer.Complete();
如果你也关心性能并且不想让垃圾收集器负担过重,你应该看看 ArrayPool
class。但要小心,因为这 class 提供了搬起石头砸自己脚的新方法。