producer/consumer 通过队列获取不同的数据
producer/consumer get different data by Queue
解决了!!!谢谢
注意你放入的是什么类型的对象Queue.If你放了一个值,比如int,然后enqueue会复制一份,大家都很高兴。如果你把一个引用,比如byte[], string, enqueue 把这个引用放到队列里,那么问题就来了。如果在消费者读取之前更改此引用,则消费者将读取 changed 版本的数据。
为避免此问题,在入队后立即在 rxThread 中获取新版本的帧引用。代码:
public void rxThreadFunc()
{
byte[] data = new byte[datalen];//declare for the first iteration.
int j = 0;
while (true)
{
for (int i = 0; i < rxlen; i++)
{
data[j] = (byte)i;
j++;
if (j >= datalen)
{
j = 0;
mQ.Add(data);
using (StreamWriter fwriter = new StreamWriter("C:\testsave\rxdata", true))
{
for (int k = 0; k < datalen; k++)
{
fwriter.Write(data[k]);
fwriter.Write(",");
}
fwriter.Write("\n");
}
data = new byte[datalen];//create new reference after enqueue/Add
}
}
}
}//rxThreadFunc()
更新1
我刚刚写了另一个更简单的代码,这样每个人都可以在没有串口硬件的情况下测试它。单击按钮 运行 程序。
我认为线程优先级会导致此问题,在不更改 rxthread 的线程优先级的情况下,dataProc 线程将获得正确的数据。但我还是不知道为什么。
rxThread.Priority=ThreadPriority.Hightest
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.Collections.Concurrent;
using System.Threading;
using System.IO;
namespace WindowsFormsApplication1
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private void button1_Click(object sender, EventArgs e)
{
Thread rxThread = new Thread(rxThreadFunc);
rxThread.Priority = ThreadPriority.Highest;//this causes problem
rxThread.Start();
Thread procThread = new Thread(dataProc);
procThread.Start();
}
BlockingCollection<byte[]> mQ = new BlockingCollection<byte[]>();
int datalen = 30;
int rxlen = 200;
public void rxThreadFunc()
{
int j = 0;
while (true)
{
byte[] data = new byte[datalen];//is this in the right place?
for (int i = 0; i < rxlen; i++)
{
data[j] = (byte)i;
j++;
if (j >= datalen)
{
j = 0;
mQ.Add(data);
using (StreamWriter fwriter = new StreamWriter("C:\testsave\rxdata", true))
{
for (int k = 0; k < datalen; k++)
{
fwriter.Write(data[k]);
fwriter.Write(",");
}
fwriter.Write("\n");
}
}
}
}
}//rxThreadFunc()
public void dataProc()
{
byte[] outData = new byte[datalen];
while (true)
{
if (mQ.Count > 1)
{
outData=mQ.Take();
using(StreamWriter fwriter=new StreamWriter("C:\testsave\dataProc",true))
{
for (int i = 0; i < datalen; i++)
{
fwriter.Write(outData[i]);
fwriter.Write(",");
}
fwriter.Write("\n");
}
}
}
}
}
}
描述:
我写的这个应用程序包含两个线程。 RxThread从串口接收到数据,按照header mark 0x55 0xaa排序,将后面的30个字节放入一个FrameStructclass,然后将这个FrameStruct放入Queue。
dataProcess 线程从队列中获取帧,然后将其存储到磁盘。
SerialPort=(rxbuff)=>RxThread=(rxFrame,Queue)=>dataProcess==>disk
问题:
dataProcess 线程接收并保存到磁盘的数据不知何故已损坏。
尝试过:
这是我试过的,供大家参考。
- 我尝试了 BlockedCollection,它是自然线程安全的,而不是 Queue,它仍然不起作用。所以我想这不是队列的问题。
- 我尝试在FrameStruct中添加另一个成员int cnt,它在RxThread中的messageQ.Enqueue()之前自增。然后dataProcess线程就可以正确获取了。所以我认为可能是data[]有问题,但是...
- 但我尝试将字节数据[30]而不是 FrameStruct 放入队列中,但不起作用。
另外我觉得dataProcess收到的时间也是正确的。
5.如果我在 RxThread 中的 Monitor.Pulse() 之后放置一个 Thread.sleep(20),问题就解决了,但我不明白为什么???如果换台电脑呢?
这是代码快照。
//declared:
//Queue<FrameStruct>messageQ=new Queue<FrameStruct>;
//object _LockerMQ=new object();
private void RxThread()
{
int bytestoread, i;
bool f55 = false;//55 flag
bool fs = false;//frame start flag
int j=0;//data index in FrameStruct
int m_lMaxFram=32;
bytestoread = 0;
FrameStruct rxFrame = new FrameStruct((int)m_lMaxFrame);
while (true)
{
if (Serial_Port.IsOpen == true)
{
if ((bytestoread = Serial_Port.BytesToRead) > m_lMaxFrame*2)//get at least two frames
{
rxbuff = new byte[bytestoread];
Serial_Port.Read(rxbuff, 0, bytestoread);
for (i = 0; i < bytestoread; i++)
{
if (rxbuff[i] == 0x55)
{
f55 = true;
continue;
}
if (rxbuff[i] == 0xaa && f55)
{//frame header 0x55, 0xaa
//new frame start
fs = true;
f55 = false;
j = 0;//rxframe index;
rxFrame.time = DateTime.Now;//store the datetime when this thread gets this frame
continue;
}
if (fs && j < m_lMaxFrame - 2)
{//frame started but not ended
rxFrame.data[j] = rxbuff[i];
j++;
}
if (j >= (m_lMaxFrame - 2) && fs)
{//frame ended if j=30, reaches the end of rxFrame.data
fs = false;
lock(_LockerMQ)
{
messageQ.Enqueue(rxFrame);
Monitor.Pulse(_LockerMQ);
}
//Thread.Sleep(20);//if uncomment this sleep, problem solved
using (StreamWriter fWriter = new StreamWriter("c:\testsave\RXdata", true))//save rxThread result into a file rawdata
{
fWriter.Write(rxFrame.time.ToString("yyyy/MM/dd HH:mm:ss.fff"));
fWriter.Write(",");
for (int k = 0; k < m_lMaxFrame - 2; k++)
{
fWriter.Write(rxFrame.data[k]);
fWriter.Write(",");
}
fWriter.Write("\n");
}
}
}
}//if ((bytestoread=Serial_Port.BytesToRead) > 0)
rxbuff = null;
Thread.Sleep(20);
}//(Serial_Port.IsOpen==true)
Thread.Sleep(100);
}//while(true),RxThread sleep
}//private void RxThread()
DataProcess 线程:
public void dataProcess()
{
while (true)
{
lock (_LockerMQ)
{
while (messageQ.Count < 1) Monitor.Wait(_LockerMQ);//get at least one frame data
f_NewFrame = messageQ.Count;
if (f_NewFrame > 0)
{
procFrame = messageQ.Dequeue();
using (StreamWriter fWriter = new StreamWriter("c:\testsave\dPdata", true))
{
fWriter.Write(procFrame.time.ToString("yyyy/MM/dd HH:mm:ss.fff"));
fWriter.Write(",");
for (int i = 0; i < m_lMaxFrame - 2; i++)
{
fWriter.Write(procFrame.data[i]);
fWriter.Write(",");
}
fWriter.Write("\n");
}
}//if(f_NewFrame>0)
}//lock(messageQ)
}
}
FrameStruct 包含时间成员和数据[30]
class FrameStruct
{
public FrameStruct(int m_lMaxFrame)
{
time = DateTime.Now;
data = new byte[m_lMaxFrame - 2];
}
public DateTime time;
public volatile byte[] data;//volatile doesn't help
}
RxThread保存的rxData是正确的,显示:
2015/07/18 18:40:26.125,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,111,51,204,
2015/07/18 18:40:26.177,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,112,51,204,
2015/07/18 18:40:26.177,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,113,51,204,
2015/07/18 18:40:26.297,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,114,51,204,
2015/07/18 18:40:26.298,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,115,51,204,
2015/07/18 18:40:26.298,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,116,51,204,
2015/07/18 18:40:26.299,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,117,51,204,
2015/07/18 18:40:26.420,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,118,51,204,
//^this columns is accumulated number
dataProcessThread 保存的 dPdata 是错误的,显示:
2015/07/18 18:40:31.904,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,227,51,204,
2015/07/18 18:40:31.905,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,228,51,204,
2015/07/18 18:40:31.905,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,229,51,204,
2015/07/18 18:40:32.026,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,231,51,204,
2015/07/18 18:40:32.026,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,231,51,204,
2015/07/18 18:40:32.147,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,232,51,204,
2015/07/18 18:40:32.148,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,233,51,204,
2015/07/18 18:40:32.148,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,234,51,204,
2015/07/18 18:40:32.269,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,236,51,204,
2015/07/18 18:40:32.269,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,236,51,204,
2015/07/18 18:40:32.510,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,237,51,204,
2015/07/18 18:40:32.512,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,240,51,204,
2015/07/18 18:40:32.512,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,240,51,204,
2015/07/18 18:40:32.514,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,240,51,204,
2015/07/18 18:40:32.514,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,241,51,204,
2015/07/18 18:40:32.635,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,243,51,204,
2015/07/18 18:40:32.635,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,243,51,204,
//^this accumulated number is not correct
请帮忙!
谢谢!
FrameStruct 是一个 class(而不是一个结构),当您将它排入队列时,您一遍又一遍地使用相同的引用。
更新 2
这是基本的生产者消费者,只需添加您的逻辑:
class Program
{
static BlockingCollection<int> mQ = new BlockingCollection<int>();
static void Main(string[] args)
{
Thread rxThread = new Thread(rxThreadFunc);
rxThread.Priority = ThreadPriority.Highest;//this causes problem
rxThread.Start();
Thread procThread = new Thread(dataProc);
procThread.Start();
Console.ReadLine();
}
static public void rxThreadFunc()
{
for (int i = 0; i < 10; i++)
{
mQ.Add(i);
}
}
static public void dataProc()
{
foreach (int outData in mQ.GetConsumingEnumerable())
{
Console.WriteLine(outData);
}
}
}
更新 1 的答案:
- 是的,这是在正确的地方
现在,由于您使用的 BlockingCollection,消费者 (dataProc) 可能非常简单(只需删除循环和计数检查,它会为您完成所有这些同步):
foreach (byte[] outData in _taskQ.GetConsumingEnumerable())
{
using(StreamWriter fwriter=new StreamWriter("C:\testsave\dataProc",true))
{
for (int i = 0; i < datalen; i++)
{
fwriter.Write(outData[i]);
fwriter.Write(",");
}
fwriter.Write("\n");
}
}
现在,问题可能与原来的问题不同 post。也许是因为生产者在这里也将数据写入文件?
原文:
它有很多解释,但解决方案很简单,只需将 FrameStruct 的初始化添加到您的第二个 "if" 语句:
rxFrame = new FrameStruct((int)m_lMaxFrame);
如您保存的数据文件所示:在损坏的文件中,每次当您有缺失值(例如 230)时 - 您都会复制其他值(例如 231)。缺失值的计数等于重复值的计数。
原因是您将对同一对象实例的引用添加到您的队列中。
让我们看看下面的场景:RxThread 循环了 N 次,在它上下文切换到 dataProcess 线程之前,它向队列中添加了对同一个 FrameStruct 实例的 N 个引用。此实例中的数据将是上下文切换之前最后一次读取循环迭代的数据。
现在上下文切换发生了:dataProcess 在上下文切换回 RxThread 之前循环了 M < N 次,因此它从队列中读取 M 个元素,但所有元素都指向同一个实例,因此它在同一行中写入 M 次文件(最后一个如前所述)
现在,为什么 Thread.Sleep 有帮助。
简短的回答:每次 RxThread 将 1 个元素添加到队列时,上下文切换到 dataProcess 线程的可能性非常高。所以它实际上是:读一个 --> 上下文切换 --> 写一个... 又是同样的事情。
长答案是:在 dataProcess 线程执行 Monitor.Wait 之后,它进入等待队列,上下文切换调度 RxThread。现在线程将第一个元素添加到队列中并执行 Monitor.Pulse。这会将 dataProcess 线程移动到就绪队列。但不一定立即为 运行 调度它,因此 RxThread 可以进行另一次迭代。但是,如果您执行 Thread.Sleep - 现在很有可能会进行上下文切换和 ataProcess 线程。
解决了!!!谢谢
注意你放入的是什么类型的对象Queue.If你放了一个值,比如int,然后enqueue会复制一份,大家都很高兴。如果你把一个引用,比如byte[], string, enqueue 把这个引用放到队列里,那么问题就来了。如果在消费者读取之前更改此引用,则消费者将读取 changed 版本的数据。
为避免此问题,在入队后立即在 rxThread 中获取新版本的帧引用。代码:
public void rxThreadFunc()
{
byte[] data = new byte[datalen];//declare for the first iteration.
int j = 0;
while (true)
{
for (int i = 0; i < rxlen; i++)
{
data[j] = (byte)i;
j++;
if (j >= datalen)
{
j = 0;
mQ.Add(data);
using (StreamWriter fwriter = new StreamWriter("C:\testsave\rxdata", true))
{
for (int k = 0; k < datalen; k++)
{
fwriter.Write(data[k]);
fwriter.Write(",");
}
fwriter.Write("\n");
}
data = new byte[datalen];//create new reference after enqueue/Add
}
}
}
}//rxThreadFunc()
更新1 我刚刚写了另一个更简单的代码,这样每个人都可以在没有串口硬件的情况下测试它。单击按钮 运行 程序。 我认为线程优先级会导致此问题,在不更改 rxthread 的线程优先级的情况下,dataProc 线程将获得正确的数据。但我还是不知道为什么。
rxThread.Priority=ThreadPriority.Hightest
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.Collections.Concurrent;
using System.Threading;
using System.IO;
namespace WindowsFormsApplication1
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private void button1_Click(object sender, EventArgs e)
{
Thread rxThread = new Thread(rxThreadFunc);
rxThread.Priority = ThreadPriority.Highest;//this causes problem
rxThread.Start();
Thread procThread = new Thread(dataProc);
procThread.Start();
}
BlockingCollection<byte[]> mQ = new BlockingCollection<byte[]>();
int datalen = 30;
int rxlen = 200;
public void rxThreadFunc()
{
int j = 0;
while (true)
{
byte[] data = new byte[datalen];//is this in the right place?
for (int i = 0; i < rxlen; i++)
{
data[j] = (byte)i;
j++;
if (j >= datalen)
{
j = 0;
mQ.Add(data);
using (StreamWriter fwriter = new StreamWriter("C:\testsave\rxdata", true))
{
for (int k = 0; k < datalen; k++)
{
fwriter.Write(data[k]);
fwriter.Write(",");
}
fwriter.Write("\n");
}
}
}
}
}//rxThreadFunc()
public void dataProc()
{
byte[] outData = new byte[datalen];
while (true)
{
if (mQ.Count > 1)
{
outData=mQ.Take();
using(StreamWriter fwriter=new StreamWriter("C:\testsave\dataProc",true))
{
for (int i = 0; i < datalen; i++)
{
fwriter.Write(outData[i]);
fwriter.Write(",");
}
fwriter.Write("\n");
}
}
}
}
}
}
描述: 我写的这个应用程序包含两个线程。 RxThread从串口接收到数据,按照header mark 0x55 0xaa排序,将后面的30个字节放入一个FrameStructclass,然后将这个FrameStruct放入Queue。 dataProcess 线程从队列中获取帧,然后将其存储到磁盘。
SerialPort=(rxbuff)=>RxThread=(rxFrame,Queue)=>dataProcess==>disk
问题: dataProcess 线程接收并保存到磁盘的数据不知何故已损坏。
尝试过: 这是我试过的,供大家参考。
- 我尝试了 BlockedCollection,它是自然线程安全的,而不是 Queue,它仍然不起作用。所以我想这不是队列的问题。
- 我尝试在FrameStruct中添加另一个成员int cnt,它在RxThread中的messageQ.Enqueue()之前自增。然后dataProcess线程就可以正确获取了。所以我认为可能是data[]有问题,但是...
- 但我尝试将字节数据[30]而不是 FrameStruct 放入队列中,但不起作用。
另外我觉得dataProcess收到的时间也是正确的。
5.如果我在 RxThread 中的 Monitor.Pulse() 之后放置一个 Thread.sleep(20),问题就解决了,但我不明白为什么???如果换台电脑呢?
这是代码快照。
//declared:
//Queue<FrameStruct>messageQ=new Queue<FrameStruct>;
//object _LockerMQ=new object();
private void RxThread()
{
int bytestoread, i;
bool f55 = false;//55 flag
bool fs = false;//frame start flag
int j=0;//data index in FrameStruct
int m_lMaxFram=32;
bytestoread = 0;
FrameStruct rxFrame = new FrameStruct((int)m_lMaxFrame);
while (true)
{
if (Serial_Port.IsOpen == true)
{
if ((bytestoread = Serial_Port.BytesToRead) > m_lMaxFrame*2)//get at least two frames
{
rxbuff = new byte[bytestoread];
Serial_Port.Read(rxbuff, 0, bytestoread);
for (i = 0; i < bytestoread; i++)
{
if (rxbuff[i] == 0x55)
{
f55 = true;
continue;
}
if (rxbuff[i] == 0xaa && f55)
{//frame header 0x55, 0xaa
//new frame start
fs = true;
f55 = false;
j = 0;//rxframe index;
rxFrame.time = DateTime.Now;//store the datetime when this thread gets this frame
continue;
}
if (fs && j < m_lMaxFrame - 2)
{//frame started but not ended
rxFrame.data[j] = rxbuff[i];
j++;
}
if (j >= (m_lMaxFrame - 2) && fs)
{//frame ended if j=30, reaches the end of rxFrame.data
fs = false;
lock(_LockerMQ)
{
messageQ.Enqueue(rxFrame);
Monitor.Pulse(_LockerMQ);
}
//Thread.Sleep(20);//if uncomment this sleep, problem solved
using (StreamWriter fWriter = new StreamWriter("c:\testsave\RXdata", true))//save rxThread result into a file rawdata
{
fWriter.Write(rxFrame.time.ToString("yyyy/MM/dd HH:mm:ss.fff"));
fWriter.Write(",");
for (int k = 0; k < m_lMaxFrame - 2; k++)
{
fWriter.Write(rxFrame.data[k]);
fWriter.Write(",");
}
fWriter.Write("\n");
}
}
}
}//if ((bytestoread=Serial_Port.BytesToRead) > 0)
rxbuff = null;
Thread.Sleep(20);
}//(Serial_Port.IsOpen==true)
Thread.Sleep(100);
}//while(true),RxThread sleep
}//private void RxThread()
DataProcess 线程:
public void dataProcess()
{
while (true)
{
lock (_LockerMQ)
{
while (messageQ.Count < 1) Monitor.Wait(_LockerMQ);//get at least one frame data
f_NewFrame = messageQ.Count;
if (f_NewFrame > 0)
{
procFrame = messageQ.Dequeue();
using (StreamWriter fWriter = new StreamWriter("c:\testsave\dPdata", true))
{
fWriter.Write(procFrame.time.ToString("yyyy/MM/dd HH:mm:ss.fff"));
fWriter.Write(",");
for (int i = 0; i < m_lMaxFrame - 2; i++)
{
fWriter.Write(procFrame.data[i]);
fWriter.Write(",");
}
fWriter.Write("\n");
}
}//if(f_NewFrame>0)
}//lock(messageQ)
}
}
FrameStruct 包含时间成员和数据[30]
class FrameStruct
{
public FrameStruct(int m_lMaxFrame)
{
time = DateTime.Now;
data = new byte[m_lMaxFrame - 2];
}
public DateTime time;
public volatile byte[] data;//volatile doesn't help
}
RxThread保存的rxData是正确的,显示:
2015/07/18 18:40:26.125,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,111,51,204,
2015/07/18 18:40:26.177,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,112,51,204,
2015/07/18 18:40:26.177,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,113,51,204,
2015/07/18 18:40:26.297,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,114,51,204,
2015/07/18 18:40:26.298,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,115,51,204,
2015/07/18 18:40:26.298,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,116,51,204,
2015/07/18 18:40:26.299,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,117,51,204,
2015/07/18 18:40:26.420,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,118,51,204,
//^this columns is accumulated number
dataProcessThread 保存的 dPdata 是错误的,显示:
2015/07/18 18:40:31.904,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,227,51,204,
2015/07/18 18:40:31.905,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,228,51,204,
2015/07/18 18:40:31.905,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,229,51,204,
2015/07/18 18:40:32.026,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,231,51,204,
2015/07/18 18:40:32.026,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,231,51,204,
2015/07/18 18:40:32.147,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,232,51,204,
2015/07/18 18:40:32.148,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,233,51,204,
2015/07/18 18:40:32.148,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,234,51,204,
2015/07/18 18:40:32.269,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,236,51,204,
2015/07/18 18:40:32.269,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,236,51,204,
2015/07/18 18:40:32.510,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,237,51,204,
2015/07/18 18:40:32.512,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,240,51,204,
2015/07/18 18:40:32.512,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,240,51,204,
2015/07/18 18:40:32.514,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,240,51,204,
2015/07/18 18:40:32.514,127,255,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,241,51,204,
2015/07/18 18:40:32.635,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,243,51,204,
2015/07/18 18:40:32.635,128,0,255,255,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,243,51,204,
//^this accumulated number is not correct
请帮忙!
谢谢!
FrameStruct 是一个 class(而不是一个结构),当您将它排入队列时,您一遍又一遍地使用相同的引用。
更新 2
这是基本的生产者消费者,只需添加您的逻辑:
class Program
{
static BlockingCollection<int> mQ = new BlockingCollection<int>();
static void Main(string[] args)
{
Thread rxThread = new Thread(rxThreadFunc);
rxThread.Priority = ThreadPriority.Highest;//this causes problem
rxThread.Start();
Thread procThread = new Thread(dataProc);
procThread.Start();
Console.ReadLine();
}
static public void rxThreadFunc()
{
for (int i = 0; i < 10; i++)
{
mQ.Add(i);
}
}
static public void dataProc()
{
foreach (int outData in mQ.GetConsumingEnumerable())
{
Console.WriteLine(outData);
}
}
}
更新 1 的答案:
- 是的,这是在正确的地方
现在,由于您使用的 BlockingCollection,消费者 (dataProc) 可能非常简单(只需删除循环和计数检查,它会为您完成所有这些同步):
foreach (byte[] outData in _taskQ.GetConsumingEnumerable())
{
using(StreamWriter fwriter=new StreamWriter("C:\testsave\dataProc",true))
{
for (int i = 0; i < datalen; i++)
{
fwriter.Write(outData[i]);
fwriter.Write(",");
}
fwriter.Write("\n");
}
}
现在,问题可能与原来的问题不同 post。也许是因为生产者在这里也将数据写入文件?
原文:
它有很多解释,但解决方案很简单,只需将 FrameStruct 的初始化添加到您的第二个 "if" 语句:
rxFrame = new FrameStruct((int)m_lMaxFrame);
如您保存的数据文件所示:在损坏的文件中,每次当您有缺失值(例如 230)时 - 您都会复制其他值(例如 231)。缺失值的计数等于重复值的计数。
原因是您将对同一对象实例的引用添加到您的队列中。 让我们看看下面的场景:RxThread 循环了 N 次,在它上下文切换到 dataProcess 线程之前,它向队列中添加了对同一个 FrameStruct 实例的 N 个引用。此实例中的数据将是上下文切换之前最后一次读取循环迭代的数据。 现在上下文切换发生了:dataProcess 在上下文切换回 RxThread 之前循环了 M < N 次,因此它从队列中读取 M 个元素,但所有元素都指向同一个实例,因此它在同一行中写入 M 次文件(最后一个如前所述)
现在,为什么 Thread.Sleep 有帮助。 简短的回答:每次 RxThread 将 1 个元素添加到队列时,上下文切换到 dataProcess 线程的可能性非常高。所以它实际上是:读一个 --> 上下文切换 --> 写一个... 又是同样的事情。
长答案是:在 dataProcess 线程执行 Monitor.Wait 之后,它进入等待队列,上下文切换调度 RxThread。现在线程将第一个元素添加到队列中并执行 Monitor.Pulse。这会将 dataProcess 线程移动到就绪队列。但不一定立即为 运行 调度它,因此 RxThread 可以进行另一次迭代。但是,如果您执行 Thread.Sleep - 现在很有可能会进行上下文切换和 ataProcess 线程。