使用 Delphi + Jedi,设备发送 USB 数据时丢失 "too fast"
Using Delphi + Jedi, losing USB data when device sends it "too fast"
使用 Jedi
库中的 Delphi XE2
和 TJvHidDevice
class,我成功地与 USB 设备(pic32mx7
板,使用我的代码进行通信运行 就可以了)。 "send request, wait for single response" 的通常方式起作用。
问题在于导致大量连续响应的命令。如果设备尽可能快地发送这些响应——或者即使我在它们之间添加一个小延迟,比如 5 毫秒——我也会丢失数据包(报告?帧?)。 OnDeviceData
事件似乎并没有为所有人触发。如果我在设备代码中添加更大的延迟,问题就会消失。
我使用 USBPcap
程序捕获 USB 数据并将其转储到一个文件中,一旦我在 WireShark 中打开它,就会包含设备发送的所有数据(我发送了 255 个数据包作为测试,所有零和一个“1”在每个数据包中将其位置移动 1 个位置)。所以,我认为设备和 Windows 都在做他们的工作。
为了确保我的 Delphi 代码没有错误,我尝试了 Jedi 示例项目 "DevReader"(这里是 main.pas code),它在屏幕上转储数据并且丢失数据包还有。
我觉得网上应该有更多关于 Jedi 的 USB classes 的信息,但我找不到它。
我可以通过 aggregating/condensing 设备的响应来避免这个问题,但仍然想知道发生了什么。
编辑:
- 从控制台应用程序尝试:数据包不再丢失。
- 修改 Jedi 演示应用程序以仅计算接收到的数据包并更新屏幕上的计数器标签(没有强制 window 重绘)- 没有丢失数据包。
- 在 OnData 事件中添加了 sleep(1) - 没有丢失数据包。
- 在 OnData 事件中添加了 sleep(2) - 再次丢失数据包。
这看起来读取数据的 Jedi 线程不能被任何处理延迟 - 不应该有一些数据缓冲(通过 Windows?)允许这种类型的处理延误?从数据包丢失 "pattern" 来看,似乎有缓冲,但它是不够的,因为我可以接收例如30 个数据包然后丢失 5 个然后再收到 20 个等
我将修改我的代码以尽快复制数据并退出 OnData 事件,以便线程具有最小值 "downtime",我将报告结果。
由于问题的原因似乎与USB读取线程被Synchronise
阻塞的时间长短有关,即主线程进行的数据处理,我在线程中进行了修改代码,(TJvHidDeviceReadThread class,JvHidControllerClass.pas 单元)。使用此单元和包含的 classes 的任何代码无需任何修改即可正常工作,没有任何 public 更改。
新行为:每次读取数据时,都将其放入线程安全列表中。它现在使用 Queue
而不是同步,但前提是它尚未排队。 Queued 方法从线程安全列表中读取,直到它为空。它为列表中的每个缓冲报告触发一个事件(与旧代码中的事件相同)。一旦列表为空,"Queued" 标志被重置,下一次读取将再次导致排队。
在目前的测试中,我没有遇到丢包的情况。
线程 class 已扩展:
TJvHidDeviceReadThread = class(TJvCustomThread)
private
FErr: DWORD;
// start of additions
ReceivedReports : TThreadList;
Queued: boolean;
procedure PushReceivedReport(const bytes: array of byte; const NumBytesRead: cardinal);
function PopReceivedReport(var ReportID: byte; var ReportBytes: TBytes): boolean;
procedure FlushBuffer;
// end of additions
procedure DoData;
procedure DoDataError;
constructor CtlCreate(const Dev: TJvHidDevice);
protected
procedure Execute; override;
public
Device: TJvHidDevice;
NumBytesRead: Cardinal;
Report: array of Byte;
constructor Create(CreateSuspended: Boolean);
//added destructor:
destructor Destroy; override;
end;
在实现部分,修改了以下内容:
constructor TJvHidDeviceReadThread.CtlCreate(const Dev: TJvHidDevice);
begin
inherited Create(False);
// start of changes
ReceivedReports := TThreadList.Create;
// end of changes
Device := Dev;
NumBytesRead := 0;
SetLength(Report, Dev.Caps.InputReportByteLength);
end;
procedure TJvHidDeviceReadThread.Execute;
...
...
...
//replaced: Synchronize(DoData); with:
PushReceivedReport (Report, NumBytesRead);
...
并添加了以下内容:
type
TReport = class
ID: byte;
Bytes: TBytes;
end;
destructor TJvHidDeviceReadThread.Destroy;
var
l: TList;
begin
RemoveQueuedEvents (self);
try
l := ReceivedReports.LockList;
while l.Count>0 do
begin
TReport(l[0]).Free;
l.Delete(0);
end;
finally
ReceivedReports.UnlockList;
FreeAndNil (ReceivedReports);
end;
inherited;
end;
procedure TJvHidDeviceReadThread.FlushBuffer;
var
ReportID: byte;
ReportBytes: TBytes;
begin
while PopReceivedReport (ReportID, ReportBytes) do
Device.OnData(Device, ReportID, ReportBytes, length(ReportBytes));
end;
function TJvHidDeviceReadThread.PopReceivedReport(var ReportID: byte; var ReportBytes: TBytes): boolean;
var
l: TList;
rep: TReport;
begin
l := ReceivedReports.LockList;
rep := nil;
try
result := l.Count>0;
if result
then
begin
rep := l[0];
l.Delete(0);
end
else Queued := false;
finally
ReceivedReports.UnlockList;
end;
if result then
begin
ReportID := rep.ID;
SetLength(ReportBytes, length(rep.Bytes));
System.move (rep.Bytes[0], ReportBytes[0], length(rep.Bytes));
rep.Free;
end;
end;
procedure TJvHidDeviceReadThread.PushReceivedReport(const bytes: array of byte; const NumBytesRead: cardinal);
var
rep: TReport;
begin
rep := TReport.Create;
setlength (rep.Bytes, NumBytesRead-1);
rep.ID := Bytes[0];
System.move (Bytes[1], rep.Bytes[0], NumBytesRead-1);
// explicitely lock the list just to provide a locking mechanism for the Queue flag as well
ReceivedReports.LockList;
try
if not Queued then
begin
Queued := true;
Queue (FlushBuffer);
end;
ReceivedReports.Add(rep);
finally
ReceivedReports.UnlockList;
end;
end;
使用 Jedi
库中的 Delphi XE2
和 TJvHidDevice
class,我成功地与 USB 设备(pic32mx7
板,使用我的代码进行通信运行 就可以了)。 "send request, wait for single response" 的通常方式起作用。
问题在于导致大量连续响应的命令。如果设备尽可能快地发送这些响应——或者即使我在它们之间添加一个小延迟,比如 5 毫秒——我也会丢失数据包(报告?帧?)。 OnDeviceData
事件似乎并没有为所有人触发。如果我在设备代码中添加更大的延迟,问题就会消失。
我使用 USBPcap
程序捕获 USB 数据并将其转储到一个文件中,一旦我在 WireShark 中打开它,就会包含设备发送的所有数据(我发送了 255 个数据包作为测试,所有零和一个“1”在每个数据包中将其位置移动 1 个位置)。所以,我认为设备和 Windows 都在做他们的工作。
为了确保我的 Delphi 代码没有错误,我尝试了 Jedi 示例项目 "DevReader"(这里是 main.pas code),它在屏幕上转储数据并且丢失数据包还有。
我觉得网上应该有更多关于 Jedi 的 USB classes 的信息,但我找不到它。
我可以通过 aggregating/condensing 设备的响应来避免这个问题,但仍然想知道发生了什么。
编辑:
- 从控制台应用程序尝试:数据包不再丢失。
- 修改 Jedi 演示应用程序以仅计算接收到的数据包并更新屏幕上的计数器标签(没有强制 window 重绘)- 没有丢失数据包。
- 在 OnData 事件中添加了 sleep(1) - 没有丢失数据包。
- 在 OnData 事件中添加了 sleep(2) - 再次丢失数据包。
这看起来读取数据的 Jedi 线程不能被任何处理延迟 - 不应该有一些数据缓冲(通过 Windows?)允许这种类型的处理延误?从数据包丢失 "pattern" 来看,似乎有缓冲,但它是不够的,因为我可以接收例如30 个数据包然后丢失 5 个然后再收到 20 个等
我将修改我的代码以尽快复制数据并退出 OnData 事件,以便线程具有最小值 "downtime",我将报告结果。
由于问题的原因似乎与USB读取线程被Synchronise
阻塞的时间长短有关,即主线程进行的数据处理,我在线程中进行了修改代码,(TJvHidDeviceReadThread class,JvHidControllerClass.pas 单元)。使用此单元和包含的 classes 的任何代码无需任何修改即可正常工作,没有任何 public 更改。
新行为:每次读取数据时,都将其放入线程安全列表中。它现在使用 Queue
而不是同步,但前提是它尚未排队。 Queued 方法从线程安全列表中读取,直到它为空。它为列表中的每个缓冲报告触发一个事件(与旧代码中的事件相同)。一旦列表为空,"Queued" 标志被重置,下一次读取将再次导致排队。
在目前的测试中,我没有遇到丢包的情况。
线程 class 已扩展:
TJvHidDeviceReadThread = class(TJvCustomThread)
private
FErr: DWORD;
// start of additions
ReceivedReports : TThreadList;
Queued: boolean;
procedure PushReceivedReport(const bytes: array of byte; const NumBytesRead: cardinal);
function PopReceivedReport(var ReportID: byte; var ReportBytes: TBytes): boolean;
procedure FlushBuffer;
// end of additions
procedure DoData;
procedure DoDataError;
constructor CtlCreate(const Dev: TJvHidDevice);
protected
procedure Execute; override;
public
Device: TJvHidDevice;
NumBytesRead: Cardinal;
Report: array of Byte;
constructor Create(CreateSuspended: Boolean);
//added destructor:
destructor Destroy; override;
end;
在实现部分,修改了以下内容:
constructor TJvHidDeviceReadThread.CtlCreate(const Dev: TJvHidDevice);
begin
inherited Create(False);
// start of changes
ReceivedReports := TThreadList.Create;
// end of changes
Device := Dev;
NumBytesRead := 0;
SetLength(Report, Dev.Caps.InputReportByteLength);
end;
procedure TJvHidDeviceReadThread.Execute;
...
...
...
//replaced: Synchronize(DoData); with:
PushReceivedReport (Report, NumBytesRead);
...
并添加了以下内容:
type
TReport = class
ID: byte;
Bytes: TBytes;
end;
destructor TJvHidDeviceReadThread.Destroy;
var
l: TList;
begin
RemoveQueuedEvents (self);
try
l := ReceivedReports.LockList;
while l.Count>0 do
begin
TReport(l[0]).Free;
l.Delete(0);
end;
finally
ReceivedReports.UnlockList;
FreeAndNil (ReceivedReports);
end;
inherited;
end;
procedure TJvHidDeviceReadThread.FlushBuffer;
var
ReportID: byte;
ReportBytes: TBytes;
begin
while PopReceivedReport (ReportID, ReportBytes) do
Device.OnData(Device, ReportID, ReportBytes, length(ReportBytes));
end;
function TJvHidDeviceReadThread.PopReceivedReport(var ReportID: byte; var ReportBytes: TBytes): boolean;
var
l: TList;
rep: TReport;
begin
l := ReceivedReports.LockList;
rep := nil;
try
result := l.Count>0;
if result
then
begin
rep := l[0];
l.Delete(0);
end
else Queued := false;
finally
ReceivedReports.UnlockList;
end;
if result then
begin
ReportID := rep.ID;
SetLength(ReportBytes, length(rep.Bytes));
System.move (rep.Bytes[0], ReportBytes[0], length(rep.Bytes));
rep.Free;
end;
end;
procedure TJvHidDeviceReadThread.PushReceivedReport(const bytes: array of byte; const NumBytesRead: cardinal);
var
rep: TReport;
begin
rep := TReport.Create;
setlength (rep.Bytes, NumBytesRead-1);
rep.ID := Bytes[0];
System.move (Bytes[1], rep.Bytes[0], NumBytesRead-1);
// explicitely lock the list just to provide a locking mechanism for the Queue flag as well
ReceivedReports.LockList;
try
if not Queued then
begin
Queued := true;
Queue (FlushBuffer);
end;
ReceivedReports.Add(rep);
finally
ReceivedReports.UnlockList;
end;
end;