在 C# 中实现线程以进行快速、批量和连续读取的最佳实践?
Best practices for implementing a thread to do fast, bulk, and continuous reading in C#?
在 .NET 4.0 中应如何处理在 C# 中从设备读取批量数据?具体来说,我需要从 USB HID 设备中快速读取,该设备发出超过 26 个数据包的报告,其中必须保留顺序。
我试过在 BackgroundWorker 线程中执行此操作。它一次从设备读取一个数据包,并在读取更多数据之前对其进行处理。这提供了相当好的响应时间,但它很容易丢失一个数据包,并且单个数据包读取的开销成本加起来。
while (!( sender as BackgroundWorker ).CancellationPending) {
//read a single packet
//check for header or footer
//process packet data
}
}
在 C# 中读取这样的设备的最佳做法是什么?
背景:
我的 USB HID 设备不断报告大量数据。数据分为 26 个数据包,我必须保留顺序。不幸的是,该设备只标记每个报告中的第一个和最后一个数据包,因此我需要能够捕获其间的所有其他数据包。
对于 .Net 4,您可以使用 BlockingCollection
to provide a threadsafe queue that can be used by a producer and a consumer. The BlockingCollection.GetConsumingEnumerable()
method provides an enumerator which automatically terminates when the queue has been marked as completed using CompleteAdding()
并且是空的。
这是一些示例代码。在此示例中,有效负载是一个整数数组,但当然您可以使用您需要的任何数据类型。
请注意,对于您的具体示例,您可以使用 the overload of GetConsumingEnumerable()
which accepts an argument of type CancellationToken
。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
public static class Program
{
private static void Main()
{
var queue = new BlockingCollection<int[]>();
Task.Factory.StartNew(() => produce(queue));
consume(queue);
Console.WriteLine("Finished.");
}
private static void consume(BlockingCollection<int[]> queue)
{
foreach (var item in queue.GetConsumingEnumerable())
{
Console.WriteLine("Consuming " + item[0]);
Thread.Sleep(25);
}
}
private static void produce(BlockingCollection<int[]> queue)
{
for (int i = 0; i < 1000; ++i)
{
Console.WriteLine("Producing " + i);
var payload = new int[100];
payload[0] = i;
queue.Add(payload);
Thread.Sleep(20);
}
queue.CompleteAdding();
}
}
}
对于 .Net 4.5 及更高版本,您可以使用 Microsoft's Task Parallel Library 中的更高级别 类,它具有丰富的功能(乍一看可能有些令人生畏)。
下面是使用 TPL DataFlow 的相同示例:
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Demo
{
public static class Program
{
private static void Main()
{
var queue = new BufferBlock<int[]>();
Task.Factory.StartNew(() => produce(queue));
consume(queue).Wait();
Console.WriteLine("Finished.");
}
private static async Task consume(BufferBlock<int[]> queue)
{
while (await queue.OutputAvailableAsync())
{
var payload = await queue.ReceiveAsync();
Console.WriteLine("Consuming " + payload[0]);
await Task.Delay(25);
}
}
private static void produce(BufferBlock<int[]> queue)
{
for (int i = 0; i < 1000; ++i)
{
Console.WriteLine("Producing " + i);
var payload = new int[100];
payload[0] = i;
queue.Post(payload);
Thread.Sleep(20);
}
queue.Complete();
}
}
}
如果丢失数据包是一个问题,请不要在同一个线程上进行处理和读取。从 .NET 4.0 开始,他们添加了 System.Collections.Concurrent
命名空间,这使得这很容易做到。您所需要的只是一个 BlockingCollection
,它充当传入数据包的队列。
BlockingCollection<Packet> _queuedPackets = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());
void readingBackgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
while (!( sender as BackgroundWorker ).CancellationPending)
{
Packet packet = GetPacket();
_queuedPackets.Add(packet);
}
_queuedPackets.CompleteAdding();
}
void processingBackgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
List<Packet> report = new List<Packet>();
foreach(var packet in _queuedPackets.GetConsumingEnumerable())
{
report.Add(packet);
if(packet.IsLastPacket)
{
ProcessReport(report);
report = new List<Packet>();
}
}
}
当 _queuedPackets
为空时 _queuedPackets.GetConsumingEnumerable()
将阻塞线程而不消耗任何资源。一旦数据包到达,它将解除阻塞并执行 foreach 的下一次迭代。
当您调用 _queuedPackets.CompleteAdding();
时,处理线程上的 foreach 将 运行 直到集合为空,然后退出 foreach 循环。如果您不希望它在取消时变为 "finish up the queue",您可以轻松地将其更改为提前退出。我也打算改用 Tasks 而不是 Background worker,因为它使参数的传递更容易。
void ReadingLoop(BlockingCollection<Packet> queue, CancellationToken token)
{
while (!token.IsCancellationRequested)
{
Packet packet = GetPacket();
queue.Add(packet);
}
queue.CompleteAdding();
}
void ProcessingLoop(BlockingCollection<Packet> queue, CancellationToken token)
{
List<Packet> report = new List<Packet>();
try
{
foreach(var packet in queue.GetConsumingEnumerable(token))
{
report.Add(packet);
if(packet.IsLastPacket)
{
ProcessReport(report);
report = new List<Packet>();
}
}
}
catch(OperationCanceledException)
{
//Do nothing, we don't care that it happened.
}
}
//This would replace your backgroundWorker.RunWorkerAsync() calls;
private void StartUpLoops()
{
var queue = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());
var cancelRead = new CancellationTokenSource();
var cancelProcess = new CancellationTokenSource();
Task.Factory.StartNew(() => ReadingLoop(queue, cancelRead.Token));
Task.Factory.StartNew(() => ProcessingLoop(queue, cancelProcess.Token));
//You can stop each loop indpendantly by calling cancelRead.Cancel() or cancelProcess.Cancel()
}
在 .NET 4.0 中应如何处理在 C# 中从设备读取批量数据?具体来说,我需要从 USB HID 设备中快速读取,该设备发出超过 26 个数据包的报告,其中必须保留顺序。
我试过在 BackgroundWorker 线程中执行此操作。它一次从设备读取一个数据包,并在读取更多数据之前对其进行处理。这提供了相当好的响应时间,但它很容易丢失一个数据包,并且单个数据包读取的开销成本加起来。
while (!( sender as BackgroundWorker ).CancellationPending) {
//read a single packet
//check for header or footer
//process packet data
}
}
在 C# 中读取这样的设备的最佳做法是什么?
背景:
我的 USB HID 设备不断报告大量数据。数据分为 26 个数据包,我必须保留顺序。不幸的是,该设备只标记每个报告中的第一个和最后一个数据包,因此我需要能够捕获其间的所有其他数据包。
对于 .Net 4,您可以使用 BlockingCollection
to provide a threadsafe queue that can be used by a producer and a consumer. The BlockingCollection.GetConsumingEnumerable()
method provides an enumerator which automatically terminates when the queue has been marked as completed using CompleteAdding()
并且是空的。
这是一些示例代码。在此示例中,有效负载是一个整数数组,但当然您可以使用您需要的任何数据类型。
请注意,对于您的具体示例,您可以使用 the overload of GetConsumingEnumerable()
which accepts an argument of type CancellationToken
。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
public static class Program
{
private static void Main()
{
var queue = new BlockingCollection<int[]>();
Task.Factory.StartNew(() => produce(queue));
consume(queue);
Console.WriteLine("Finished.");
}
private static void consume(BlockingCollection<int[]> queue)
{
foreach (var item in queue.GetConsumingEnumerable())
{
Console.WriteLine("Consuming " + item[0]);
Thread.Sleep(25);
}
}
private static void produce(BlockingCollection<int[]> queue)
{
for (int i = 0; i < 1000; ++i)
{
Console.WriteLine("Producing " + i);
var payload = new int[100];
payload[0] = i;
queue.Add(payload);
Thread.Sleep(20);
}
queue.CompleteAdding();
}
}
}
对于 .Net 4.5 及更高版本,您可以使用 Microsoft's Task Parallel Library 中的更高级别 类,它具有丰富的功能(乍一看可能有些令人生畏)。
下面是使用 TPL DataFlow 的相同示例:
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Demo
{
public static class Program
{
private static void Main()
{
var queue = new BufferBlock<int[]>();
Task.Factory.StartNew(() => produce(queue));
consume(queue).Wait();
Console.WriteLine("Finished.");
}
private static async Task consume(BufferBlock<int[]> queue)
{
while (await queue.OutputAvailableAsync())
{
var payload = await queue.ReceiveAsync();
Console.WriteLine("Consuming " + payload[0]);
await Task.Delay(25);
}
}
private static void produce(BufferBlock<int[]> queue)
{
for (int i = 0; i < 1000; ++i)
{
Console.WriteLine("Producing " + i);
var payload = new int[100];
payload[0] = i;
queue.Post(payload);
Thread.Sleep(20);
}
queue.Complete();
}
}
}
如果丢失数据包是一个问题,请不要在同一个线程上进行处理和读取。从 .NET 4.0 开始,他们添加了 System.Collections.Concurrent
命名空间,这使得这很容易做到。您所需要的只是一个 BlockingCollection
,它充当传入数据包的队列。
BlockingCollection<Packet> _queuedPackets = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());
void readingBackgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
while (!( sender as BackgroundWorker ).CancellationPending)
{
Packet packet = GetPacket();
_queuedPackets.Add(packet);
}
_queuedPackets.CompleteAdding();
}
void processingBackgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
List<Packet> report = new List<Packet>();
foreach(var packet in _queuedPackets.GetConsumingEnumerable())
{
report.Add(packet);
if(packet.IsLastPacket)
{
ProcessReport(report);
report = new List<Packet>();
}
}
}
当 _queuedPackets
为空时 _queuedPackets.GetConsumingEnumerable()
将阻塞线程而不消耗任何资源。一旦数据包到达,它将解除阻塞并执行 foreach 的下一次迭代。
当您调用 _queuedPackets.CompleteAdding();
时,处理线程上的 foreach 将 运行 直到集合为空,然后退出 foreach 循环。如果您不希望它在取消时变为 "finish up the queue",您可以轻松地将其更改为提前退出。我也打算改用 Tasks 而不是 Background worker,因为它使参数的传递更容易。
void ReadingLoop(BlockingCollection<Packet> queue, CancellationToken token)
{
while (!token.IsCancellationRequested)
{
Packet packet = GetPacket();
queue.Add(packet);
}
queue.CompleteAdding();
}
void ProcessingLoop(BlockingCollection<Packet> queue, CancellationToken token)
{
List<Packet> report = new List<Packet>();
try
{
foreach(var packet in queue.GetConsumingEnumerable(token))
{
report.Add(packet);
if(packet.IsLastPacket)
{
ProcessReport(report);
report = new List<Packet>();
}
}
}
catch(OperationCanceledException)
{
//Do nothing, we don't care that it happened.
}
}
//This would replace your backgroundWorker.RunWorkerAsync() calls;
private void StartUpLoops()
{
var queue = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());
var cancelRead = new CancellationTokenSource();
var cancelProcess = new CancellationTokenSource();
Task.Factory.StartNew(() => ReadingLoop(queue, cancelRead.Token));
Task.Factory.StartNew(() => ProcessingLoop(queue, cancelProcess.Token));
//You can stop each loop indpendantly by calling cancelRead.Cancel() or cancelProcess.Cancel()
}