在 Raspberry Pi 3B+ 的 serial0 上输出时,当写入系统调用通过其他内核进程中断时,会传输一个虚假字节
When outputting on serial0 of the Rasberry Pi 3B+ a spurious byte is transmitted when write syscall is interrupted via other kernel processes
我 运行 我正在生成 运行dom 数据包,以便 运行 通过 Raspberry PI 3b+ 的 uart 发送 Raspbian OS。我发现在对 运行d() 调用 240 次左右后,我的数据包中生成了一个额外的字符,这可能导致 1 字节缓冲区超过 运行。因此,如果我的数据包的数据长度为 48 字节,则接收设备将在线路上看到 49 字节,但 t运行smitting 设备看不到。我已通过接收设备上的内存分析验证在线路上发送了 1 个额外字节。
有没有人 运行 遇到这样的情况:如果 write() 系统调用在写入 /dev/serial0 设备套接字时被中断,raspberry pi 将发送不正确的字节?
我最好的猜测是当 Linux 中的伪 运行dom 号码池在该系统调用期间通过内核刷新时。当我 t运行smit 一个 48 字节固定数据的数据数组时,我没有得到那个虚假字节 t运行smission,但是如果我自动生成 .[=18,我会得到那个虚假字节=]
我写的程序有2个pthreads。一个用于生成数据包,另一个用于 t运行 发送它们。 t运行smitting 线程在 t=0 时自动休眠,因为没有数据要发送。生成数据的线程会在生成 4 个数据包后唤醒 t运行smitting 线程。即使我在每次 t运行smission 后将 t运行smitting 线程休眠 1 秒,我也可以定期重现此问题。
我试图减少未使用的代码,但它仍然超过 300 行。这是一个常规问题,现在它和我玩猫捉老鼠。
To Build:
gcc -pthread -g -o exp exp.c
#include <unistd.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <fcntl.h>
#include <termios.h>
#include <errno.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <time.h>
#include <assert.h>
#define SPORT "//dev//serial0"
#define NETPAYLOADSZ 256
#define DATPOOLSZ 16
typedef union __attribute__((packed)) PACKET
{
struct
{
uint8_t SID;
uint8_t DID;
uint8_t LEN;
uint8_t SPT;
uint8_t DPT;
uint8_t TID;
uint8_t DAT[249];//very last byte is the checksum
};
uint8_t packet[NETPAYLOADSZ];
}uPacket;
uPacket *DataPool[DATPOOLSZ];
enum THRDSTAT { eNOTRUNNING, eRUNNING, eSLEEPING, eSTOPPED };
struct SerRes
{
int sigint;
int serialFD;
int datInPoolIdx; //writen by GenDat, read by SerTx
int datOutPoolIdx;//written by SerTx
enum THRDSTAT statusGen;
enum THRDSTAT statusTX;
uint8_t dataAvail;
pthread_mutex_t sermut;
pthread_mutex_t genSleepmux;
pthread_mutex_t serTxSleepmux;
pthread_cond_t genDatSleep;
pthread_cond_t serTxSleep;
};
struct SerRes serres;
uPacket stDatPool[DATPOOLSZ];
void SIGHandler(int num)
{
struct termios options;
if(serres.sigint == 1)
{
close(serres.serialFD);
serres.statusGen = eSTOPPED;
serres.statusTX = eSTOPPED;
exit(-1);
return;
}
tcgetattr(serres.serialFD, &options);
options.c_cc[VTIME] = 1; //timeout of 100ms
options.c_cc[VMIN] = 0; //1 receive atleast 1 character
tcsetattr(serres.serialFD, TCSANOW, &options);
serres.sigint = 1;
}
void Init(char *serpath)
{
struct termios options;
memset(&serres, 0, sizeof(struct SerRes));
memset(&DataPool, 0, sizeof(uPacket*)*DATPOOLSZ);
pthread_mutex_init(&serres.sermut, NULL);
pthread_mutex_init(&serres.genSleepmux, NULL);
pthread_cond_init(&serres.genDatSleep, NULL);
pthread_cond_init(&serres.serTxSleep, NULL);
serres.serialFD = open(serpath, O_RDWR | O_NOCTTY | O_NDELAY);
if(serres.serialFD < 0)
{
printf("\nError no is: %d", errno);
printf("\nError description: %s\n", strerror(errno));
exit(-1);
}
cfmakeraw(&options);
tcgetattr(serres.serialFD, &options);
options.c_cflag = CS8 | CLOCAL | CREAD;
options.c_lflag &= ~(ICANON | ECHO | ECHOE | ISIG );
options.c_cc[VTIME] = 10; //timeout of 100ms
options.c_cc[VMIN] = 1; //1 receive atleast 1 character
cfsetspeed(&options, B115200); //set both input and output speed
tcsetattr(serres.serialFD, TCSANOW, &options);
}
void Deinit(void)
{
close(serres.serialFD);
pthread_mutex_destroy(&serres.sermut);
pthread_mutex_destroy(&serres.genSleepmux);
pthread_cond_destroy(&serres.genDatSleep);
}
void *DatGen(void *arg)
{
int randDev = 0;
uint8_t idx;
uint8_t cksum = 0;
uint8_t buffer[48] = {0x91, 0x39, 0x97, 0xb9, 0x50, 0x9b, 0x7a, 0x33, 0xe3, 0x1, 0xfa, 0x82, 0x61, 0xbd, 0xec, 0x1,\
0x8, 0x5, 0xd, 0x9c, 0x27, 0xcc, 0x4e, 0x8e, 0x63, 0x48, 0x37, 0x3b, 0x66, 0xde, 0x48, 0x77,\
0x98, 0xdf, 0x31, 0x68, 0xfa, 0x2b, 0x9b, 0x5f, 0x2c, 0x96, 0xe1, 0xd, 0x54, 0x4f, 0xf, 0x5c};
pid_t tid = (pid_t)syscall(__NR_gettid);
printf("\nDatGen - %d: Starting: \r\n", tid);
serres.statusGen = eRUNNING;
srand(0x089FFEE4);
while(serres.sigint == 0 && serres.statusGen != eSTOPPED)
{
//Sleep Condition
pthread_mutex_lock(&serres.genSleepmux);
if((serres.dataAvail == DATPOOLSZ))
{
printf("DatGen - %d: No more Data to Generate: Sleeping - %d\r\n", tid, serres.dataAvail );
serres.statusGen = eSLEEPING;
while(serres.dataAvail == DATPOOLSZ) //Gaurd against spurious wake up events
{
pthread_cond_wait(&serres.genDatSleep, &serres.genSleepmux);
}
printf("Datgen - %d: ******** Wokeup, running\r\n\n", tid);
if(serres.statusTX == eSTOPPED)
break;
serres.statusGen = eRUNNING;
}
pthread_mutex_unlock(&serres.genSleepmux);
//Time to wake up the SerTX thread? Maybe?
if(serres.dataAvail > 3 && serres.statusTX == eSLEEPING)
{
pthread_mutex_lock(&serres.serTxSleepmux);
pthread_cond_signal(&serres.serTxSleep);
pthread_mutex_unlock(&serres.serTxSleepmux);
}
//Generate the Packets.
idx = serres.datInPoolIdx;
serres.datInPoolIdx = (serres.datInPoolIdx + 1) & (DATPOOLSZ - 1);
assert(serres.datInPoolIdx < DATPOOLSZ);
stDatPool[idx].SID = 0x55;
stDatPool[idx].DID = 0xAA;
stDatPool[idx].LEN = 0x30; //(rand() % 100);
stDatPool[idx].SPT = 0xEE;
stDatPool[idx].DPT = 0x22;
stDatPool[idx].TID = 0x77;
for(int i = 0; i < stDatPool[idx].LEN ; i++)
{
stDatPool[idx].DAT[i] = rand() % 100; //Only Write
cksum += stDatPool[idx].DAT[i];
}
stDatPool[idx].LEN += 7;
cksum += stDatPool[idx].SID + stDatPool[idx].DID + stDatPool[idx].SPT + stDatPool[idx].LEN;
cksum += stDatPool[idx].DPT + stDatPool[idx].TID;
stDatPool[idx].packet[stDatPool[idx].LEN-1] = 0xFF;
/********* Critical Shared Section *************/
pthread_mutex_lock(&serres.sermut);
serres.dataAvail++; //Touched by both threads...
assert(serres.dataAvail < DATPOOLSZ+1);
if(serres.dataAvail == DATPOOLSZ)
{
printf("Max Dat Reached\r\n");
}
pthread_mutex_unlock(&serres.sermut);
/*************************************************/
}
serres.statusGen = eSTOPPED;
pthread_exit(&serres.sigint);
}
#define LOOPCOUNT 8
void *SerTx(void *arg)
{
pid_t tid = (pid_t)syscall(__NR_gettid);
uint8_t idx = 0;
uint16_t randel = 0;
uint8_t count = 0;
uint8_t bytesSent = 0;
serres.statusTX = eRUNNING;
while(serres.sigint == 0 && serres.statusTX != eSTOPPED && count < LOOPCOUNT)
{
//Sleep Condition
pthread_mutex_lock(&serres.genSleepmux);
if(serres.dataAvail < 1)
{
pthread_cond_signal(&serres.genDatSleep);
printf("SerTx - %d: All Data Consumed\r\n", tid);
serres.statusTX = eSLEEPING;
while(serres.dataAvail < 1) //Gaurd against spurious wakeup events.
{
pthread_cond_wait(&serres.serTxSleep, &serres.genSleepmux);
}
serres.statusTX = eRUNNING;
printf("SerTx - %d: ^^^^^^^ Woke up Running\r\n", tid);
}
pthread_mutex_unlock(&serres.genSleepmux);
//Output
idx = serres.datOutPoolIdx;
serres.datOutPoolIdx = (serres.datOutPoolIdx + 1) & (DATPOOLSZ - 1);
bytesSent = write(serres.serialFD, &stDatPool[idx].packet[0], stDatPool[idx].LEN); //only Read
if(stDatPool[idx].LEN != bytesSent) //did we not send all the bytes?
{
printf("Pkt Len: %x\nBytesSent: %x\r\n", stDatPool[idx].LEN, bytesSent);
assert(0);
}
printf("Consume: %x\r\n", stDatPool[idx].LEN);
/********* Critical Shared Section *************/
pthread_mutex_lock(&serres.sermut);
serres.dataAvail--; //shared write
assert(serres.dataAvail < DATPOOLSZ); //unsigned, so if it goes negative, it goes BIG!!
pthread_mutex_unlock(&serres.sermut);
/*************************************************/
//usleep(1000000);
//tcflush(serres.serialFD, TCIOFLUSH);
count++;
}
serres.statusTX = eSTOPPED;
pthread_cond_signal(&serres.genDatSleep);
pthread_exit(&serres.sigint);
}
/*
* pthread DatGen generates the data for serTx
* pthread SerTx consumes the data generated by DatGen and sends out the serial port
*/
int main(int argc, char **argv)
{
pthread_t datGen, serRx, serTx;
pid_t pid = getpid();
int thrdstat = 0;
Init(SPORT);
signal(SIGINT, SIGHandler);
pthread_create(&datGen, NULL, DatGen, NULL);
pthread_create(&serTx, NULL, SerTx, NULL);
//Wait for all the threads to close
pthread_join(datGen,NULL);
pthread_join(serTx,NULL);
Deinit();
printf("\n>>>> End <<<<< %d\r\n", pid);
return 0;
}
This is generated and subsequently transmitted in bulk
packet =
0x55, 0xaa, 0x37, 0xee, 0x22, 0x77, 0x4c, 0xbf, 0x8 , 0xad,
0xeb, 0xc9, 0xa, 0xb2, 0x1d, 0x45, 0x57, 0x48, 0xc0, 0xc1,
0xa3, 0x0, 0xb4, 0x73, 0x91, 0x8b, 0x28, 0x17, 0x3 , 0x40,
0x62, 0x48, 0x86, 0xc7, 0x9e, 0x60, 0xc2, 0xea, 0x20, 0xca,
0x98, 0x8c, 0x94, 0x22, 0xbe, 0x32, 0x67, 0x96, 0xf9, 0x28,
0xd7, 0x1d, 0xa7, 0x8c, 0xff
This is received by the device.
packet =
0x55, 0xaa, 0x37, 0xee, 0x22, 0x77, 0x4c, 0xbf, 0x8 , 0xad,
0xeb, 0xc9, 0xd, 0xa, 0xb2, 0x1d, 0x45, 0x57, 0x48, 0xc0,
0xc1, 0xa3, 0x0, 0xb4, 0x73, 0x91, 0x8b, 0x28, 0x17, 0x3,
0x40, 0x62, 0x48, 0x86, 0xc7, 0x9e, 0x60, 0xc2, 0xea, 0x20,
0xca, 0x98, 0x8c, 0x94, 0x22, 0xbe, 0x32, 0x67, 0x96, 0xf9,
0x28, 0xd7, 0x1d, 0xa7, 0x8c
字节 22(0 索引)不正确。 0xD 在接收中代替 0xC9 和最后一个字节,0xFF 未被接收器正确接收。我没有提到没有流量控制,这是设计使然。
更新...
所有功劳归功于:Craig Estey
好的,我想我有决心了。我遇到的问题是没有正确设置串行套接字。我发现我的电话是:
cfmakeraw(&options);
在错误的位置。因此,c_oflags 的 OPOST 选项未取消断言。正如下面在答案更新中指出的那样,当内核在我的数据中看到 0xA
时,它会自动发送 0xD
。更正后的代码不再显示此行为。
其次有人指出我的生产者-消费者关系存在Race Condition。我不一定同意分析,但我还是看了看并更改了程序中定义的 full 和 empty 的方式。我想我本来打算走这条路,但由于 pthreads 中的信号丢失,我遇到了线程同步问题……烦人……
pthread 丢失唤醒:Lost wakeups in pthreads
似乎有效的最终代码:
#include <unistd.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <fcntl.h>
#include <termios.h>
#include <errno.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <time.h>
#include <assert.h>
#define SPORT "//dev//serial0"
#define NETPAYLOADSZ 256
typedef union __attribute__((packed)) PACKET
{
struct
{
uint8_t SID;
uint8_t DID;
uint8_t LEN;
uint8_t SPT;
uint8_t DPT;
uint8_t TID;
uint8_t DAT[249];//very last byte is the checksum
};
uint8_t packet[NETPAYLOADSZ];
}uPacket;
enum THRDSTAT { eNOTRUNNING, eRUNNING, eSLEEPING, eSTOPPED };
struct SerRes
{
int sigint;
int serialFD;
uint16_t datInPoolIdx; //writen by GenDat, read by SerTx
uint16_t datOutPoolIdx;//written by SerTx
enum THRDSTAT statusGen;
enum THRDSTAT statusTX;
uint8_t genSig;
uint8_t txSig;
uint8_t StartCond;
uint8_t dataAvail;
pthread_mutex_t sermut;
pthread_mutex_t genSleepmux;
pthread_mutex_t serTxSleepmux;
pthread_cond_t genDatSleep;
pthread_cond_t serTxSleep;
};
#define DATPOOLSZ 16
struct SerRes serres;
uPacket stDatPool[DATPOOLSZ];
void SIGHandler(int num)
{
struct termios options;
if(serres.sigint == 1)
{
close(serres.serialFD);
serres.statusGen = eSTOPPED;
serres.statusTX = eSTOPPED;
exit(-1);
return;
}
tcgetattr(serres.serialFD, &options);
options.c_cc[VTIME] = 1; //timeout of 100ms
options.c_cc[VMIN] = 0; //1 receive atleast 1 character
tcsetattr(serres.serialFD, TCSANOW, &options);
serres.sigint = 1;
}
void Init(char *serpath)
{
struct termios options;
memset(&serres, 0, sizeof(struct SerRes));
memset(&stDatPool, 0, sizeof(uPacket*)*DATPOOLSZ);
//serres.datInPoolIdx = 1; //starting condition
serres.StartCond = 1;
pthread_mutex_init(&serres.sermut, NULL);
pthread_mutex_init(&serres.genSleepmux, NULL);
pthread_cond_init(&serres.genDatSleep, NULL);
pthread_cond_init(&serres.serTxSleep, NULL);
serres.serialFD = open(serpath, O_RDWR | O_NOCTTY | O_NDELAY);
if(serres.serialFD < 0)
{
printf("\nError no is: %d", errno);
printf("\nError description: %s\n", strerror(errno));
exit(-1);
}
tcgetattr(serres.serialFD, &options);
cfmakeraw(&options);
options.c_cflag = CS8 | CLOCAL | CREAD;
options.c_lflag &= ~(ICANON | ECHO | ECHOE | ISIG );
options.c_oflag &= ~OPOST;
options.c_cc[VTIME] = 10; //timeout of 100ms
options.c_cc[VMIN] = 1; //1 receive atleast 1 character
cfsetspeed(&options, B115200); //set both input and output speed
tcsetattr(serres.serialFD, TCSANOW, &options);
}
void Deinit(void)
{
close(serres.serialFD);
pthread_mutex_destroy(&serres.sermut);
pthread_mutex_destroy(&serres.genSleepmux);
pthread_cond_destroy(&serres.genDatSleep);
}
void *DatGen(void *arg)
{
int randDev = 0;
uint8_t idx;
uint8_t cksum = 0;
pid_t tid = (pid_t)syscall(__NR_gettid);
printf("\nDatGen - %d: Starting: \r\n", tid);
serres.statusGen = eRUNNING;
srand(0x089FFEE4);
serres.datInPoolIdx = 1;// (serres.datInPoolIdx + 1) & (DATPOOLSZ - 1);
while(serres.sigint == 0 && serres.statusGen != eSTOPPED)
{
//Sleep Condition
pthread_mutex_lock(&serres.genSleepmux);
if(serres.datInPoolIdx == serres.datOutPoolIdx) //full condition
{
printf("DatGen - %d: Sleeping - %d\r\n", tid, serres.dataAvail );
serres.statusGen = eSLEEPING;
serres.StartCond = 0;
if(serres.statusTX == eSLEEPING)
{
serres.txSig = 1;
pthread_cond_signal(&serres.serTxSleep);
}
while((serres.datInPoolIdx == serres.datOutPoolIdx) && (serres.genSig == 0)) //Gaurd against spurious wake up events
{
serres.genSig = 0;
pthread_cond_wait(&serres.genDatSleep, &serres.genSleepmux);
}
serres.genSig = 0;
printf("Datgen - %d: Wokeup\r\n", tid);
if(serres.statusTX == eSTOPPED)
break;
serres.statusGen = eRUNNING;
}
idx = serres.datInPoolIdx;
assert(idx < DATPOOLSZ);
serres.datInPoolIdx = (serres.datInPoolIdx + 1) & (DATPOOLSZ - 1);
pthread_mutex_unlock(&serres.genSleepmux);
//Time to wake up the SerTX thread? Maybe?
//Generate the Packets.
stDatPool[idx].SID = 0x55;
stDatPool[idx].DID = 0xAA;
stDatPool[idx].LEN = 0x30; //(rand() % 100);
stDatPool[idx].SPT = 0xEE;
stDatPool[idx].DPT = 0x22;
stDatPool[idx].TID = 0x77;
for(int i = 0; i < stDatPool[idx].LEN ; i++)
{
stDatPool[idx].DAT[i] = i; //rand() % 100; //Only Write
cksum += stDatPool[idx].DAT[i];
}
stDatPool[idx].LEN += 7;
cksum += stDatPool[idx].SID + stDatPool[idx].DID + stDatPool[idx].SPT + stDatPool[idx].LEN;
cksum += stDatPool[idx].DPT + stDatPool[idx].TID;
stDatPool[idx].packet[stDatPool[idx].LEN-1] = 0xFF;
/********* Critical Shared Section *************/
pthread_mutex_lock(&serres.sermut);
serres.dataAvail++; //Touched by both threads...
pthread_mutex_unlock(&serres.sermut);
/*************************************************/
}
serres.statusGen = eSTOPPED;
pthread_exit(&serres.sigint);
}
#define LOOPCOUNT 8
void *SerTx(void *arg)
{
pid_t tid = (pid_t)syscall(__NR_gettid);
uint8_t idx = 0;
uint16_t randel = 0;
uint32_t count = 0;
uint8_t bytesSent = 0;
serres.statusTX = eRUNNING;
printf("SerTx - %d: Starting\r\n", tid);
while(serres.sigint == 0 && serres.statusTX != eSTOPPED)// && count < LOOPCOUNT)
{
//Sleep Condition
pthread_mutex_lock(&serres.genSleepmux);
serres.datOutPoolIdx = (serres.datOutPoolIdx + 1) & (DATPOOLSZ -1);
if((serres.datOutPoolIdx == serres.datInPoolIdx)) //Empty Condition, sleep on first start.
{
if(serres.statusGen == eSLEEPING)
{
printf("Wake GenDat\r\n");
pthread_cond_signal(&serres.genDatSleep);
serres.genSig = 1;
}
printf("SerTx - %d: Sleep\r\n", tid);
serres.statusTX = eSLEEPING;
while((serres.datOutPoolIdx == serres.datInPoolIdx) && serres.txSig == 0) //Gaurd against spurious wakeup events.
{
serres.txSig = 0;
pthread_cond_wait(&serres.serTxSleep, &serres.genSleepmux);
}
serres.txSig = 0;
serres.statusTX = eRUNNING;
printf("SerTx - %d: Running\r\n", tid);
}
idx = serres.datOutPoolIdx;
assert(idx < DATPOOLSZ);
pthread_mutex_unlock(&serres.genSleepmux);
//Output
if(stDatPool[idx].SID != 0x55)
assert(stDatPool[idx].SID == 0x55);
if(stDatPool[idx].DID != 0xAA)
assert(stDatPool[idx].DID != 0xAA);
if(stDatPool[idx].LEN != 0x37) //(rand() % 100);
assert(stDatPool[idx].LEN == 0x37);
if(stDatPool[idx].SPT != 0xEE)
assert(stDatPool[idx].SPT == 0xEE);
if(stDatPool[idx].DPT != 0x22)
assert(stDatPool[idx].DPT == 0x22);
if(stDatPool[idx].TID != 0x77)
assert(stDatPool[idx].TID == 0x77);
for(int i = 0; i < (stDatPool[idx].LEN-7); i++)
{
assert(stDatPool[idx].DAT[i] == i);
}
if(stDatPool[idx].packet[stDatPool[idx].LEN-1] != 0xFF)
assert(stDatPool[idx].packet[stDatPool[idx].LEN-1] == 0xFF);
/* bytesSent = write(serres.serialFD, &stDatPool[idx].packet[0], stDatPool[idx].LEN); //only Read
if(stDatPool[idx].LEN != bytesSent) //did we not send all the bytes?
{
printf("Pkt Len: %x\nBytesSent: %x\r\n", stDatPool[idx].LEN, bytesSent);
assert(0);
}
*/
//printf("Consume: %d\r\n", stDatPool[idx].LEN);
/********* Critical Shared Section *************/
pthread_mutex_lock(&serres.sermut);
memset(&stDatPool[idx], 0, sizeof(stDatPool[idx]));
serres.dataAvail--; //shared write
pthread_mutex_unlock(&serres.sermut);
/*************************************************/
//usleep(1000000);
//tcflush(serres.serialFD, TCIOFLUSH);
count++;
}
serres.statusTX = eSTOPPED;
pthread_cond_signal(&serres.genDatSleep);
pthread_exit(&serres.sigint);
}
/*
* pthread DatGen generates the data for serTx
* pthread SerTx consumes the data generated by DatGen and sends out the serial port
*/
int main(int argc, char **argv)
{
pthread_t datGen, serRx, serTx;
pid_t pid = getpid();
int thrdstat = 0;
Init(SPORT);
signal(SIGINT, SIGHandler);
pthread_create(&datGen, NULL, DatGen, NULL);
pthread_create(&serTx, NULL, SerTx, NULL);
//Wait for all the threads to close
pthread_join(datGen,NULL);
while(serres.StartCond == 1);
pthread_join(serTx,NULL);
Deinit();
printf("\n>>>> End <<<<< %d\r\n", pid);
return 0;
}
编辑: 感谢您编辑您的问题并发布您生成的数据和相应的 实际 在远程接收的数据 设备.
您的问题[和解决方案]简单得多。请参阅下面的 UPDATE #2
部分。
您正在访问您的环队列索引变量(例如 datInPoolIdx
和 datOutPoolIdx
)在 锁定区域之外。
我很钦佩你为此付出的所有努力。但是,我认为你有点太复杂了。
你真的只需要在锁下访问索引变量。
松散地...
如果环形队列为空,Tx 线程应该休眠:
enqidx == deqidx
如果环队列已满,生成线程应该休眠:
((enqidx + 1) % DATPOOLSIZ) == deqidx
pthread_cond_wait/pthread_cond_signal
条件应基于使用类似上述方法对这些值的比较。
这是一些伪代码,大致基于您所拥有的。它没有任何条件变量,但我想您会了解如何在需要时添加它。
这些函数一次只能处理一个字节。但是,有一种方法可以修改它们,因此它们会产生 连续 字节的长度和数量,无论是免费的还是可用的,因此您可以使用 memcpy
移动一堆字节 into/out 批量队列。
int
queue_wrap_idx(int idx)
{
idx += 1;
idx %= DATPOOLSIZ;
return idx;
}
// gen_avail -- space available in queue
// RETURNS: index of place to store (or -1=full)
int
gen_avail(void)
{
lock();
int idxenq = datInPoolIdx;
int idxdeq = datOutPoolIdx;
unlock();
int idxfull = queue_wrap_idx(idxenq);
if (idxfull == idxdeq)
idxenq = -1;
return idxenq;
}
// gen_advance -- advance generator queue index
void
gen_advance(void)
{
lock();
int idxenq = datInPoolIdx;
idxenq = queue_wrap_idx(idxenq);
datInPoolIdx = idxenq;
unlock();
}
// tx_avail -- data available in queue
// RETURNS: index of place to dequeue (or -1=empty)
int
tx_avail(void)
{
lock();
int idxenq = datInPoolIdx;
int idxdeq = datOutPoolIdx;
unlock();
if (idxdeq == idxenq)
idxdeq = -1;
return idxdeq;
}
// tx_advance -- advance transmitter queue index
void
tx_advance(void)
{
lock();
int idxdeq = datOutPoolIdx;
idxdeq = queue_wrap_idx(idxdeq);
datOutPoolIdx = idxdeq;
unlock();
}
// gen_thread -- data generator thread
void
gen_thread(void *ptr)
{
while (1) {
int idxenq = gen_avail();
if (idxenq >= 0) {
DAT[idxenq] = rand();
gen_advance();
}
}
}
// tx_thread -- serial port transmit thread
void
tx_thread(void *ptr)
{
while (1) {
int idxdeq = tx_avail();
if (idxdeq >= 0) {
char datval = DAT[idxdeq];
tx_advance();
write(serport,&datval,1);
}
}
}
更新:
I can certainly change it, no biggie, but I don't think its the problem I'm experiencing.
正如我所提到的,您肯定存在竞争条件。 Unfixed/latent 竞争条件 出现 非常像故障 H/W。在解决这些问题之前,您无法进一步推测。
If I was overwriting the transmitted memory region via the GenDat thread then I would expect to see fragments on the receiving side. What I'm seeing is a single incorrect byte that is affecting my packaging of data.
构建诊断模式[或两个] ...
只需让 GenThread 发送一个 递增 字节流(相对于 rand
/其他)。在这种模式下,完全跳过 UART TX。 TxThread 很容易注意到间隙[因为它应该看到序列 0x00-0xFF
无限重复]。
这将 运行 线程以更快的速度运行,并且更有可能出现竞争条件。
删除所有[调试] printf
。他们有可能扰乱时间的锁,所以你 不是 测量你的 [真实] 系统,而是 printf
的系统。 printf
调用 慢 并且一团糟。
您可以添加随机 nanosleep
调用以进一步强调设置。要有创意。使用原力的阴暗面来创建比真实系统所经历的多的测试。
您可以让 GenThread 故意 发送乱序字节以验证 TxThread 可以检测到间隙。
当您完成所有这些工作后,运行 一天左右的诊断模式,看看会发生什么。根据我的经验,您通常会在几分钟到一个小时内看到一些东西。当我进行测试时,我会 运行 作为验收测试过夜。
It looks like the uart TX hardware is glitching, is it hardware? LOL yes!!(i'm software) but its most definitely software...somewhere.
嗯……不太可能。我有直接 [commercial/product] 在 RPi 上使用 UART TX 的经验。
发送方系统和接收方系统之间的 RTS/CTS 设置可能不一致。或者,Tx/Rx 时钟频率/波特率略有偏差。
远程系统可能超过运行缓冲区(即)它跟不上[突发]数据速率。
接收器处理数据的速度可能很慢。您应该使用 hires 时间戳(例如 clock_gettime(CLOCK_MONOTONIC,...)
来标记字节到达等
我为此做的是有一个记录“事件”类型和时间戳的“跟踪缓冲区”。不幸的是 [对你 ;-)],我使用环形队列记录这些事件 [使用 stdatomic.h
函数,例如 atomic_compare_exchange_strong
] 到 add/remove 跟踪条目。
因此,您需要一个 solid 多线程环形队列实现来保存跟踪条目 [这里是先有鸡还是先有蛋的问题]。如果接收器类似于 FreeRTOS
,您将不得不处理 cli/sti
和其他裸机注意事项。
更新#2:
我写了一个小的 perl 脚本来分析和比较你的数据。这种分析也可以通过创建两个具有两位十六进制值的文件来完成,每行一个。这是 diff -u
输出:
--- tx.txt 2021-02-23 10:38:22.295135431 -0500
+++ rx.txt 2021-02-23 10:38:22.295135431 -0500
@@ -10,6 +10,7 @@
AD
EB
C9
+0D
0A
B2
1D
@@ -52,4 +53,3 @@
1D
A7
8C
-FF
设备接收到的数据与生成的序列相同除了生成的数据字节是:
0A
接收方得到:
0D 0A
主机 内核 TTY 层在看到 <LF>
.
时正在发送 <CR><LF>
这是因为当您设置 termios
参数时,您没有正确设置“原始”模式。
具体来说,您没有禁用“实现定义的输出处理”[根据 man termios
中的示例]。
即在Init
中需要添加:
options.c_oflag &= ~OPOST;
我 运行 我正在生成 运行dom 数据包,以便 运行 通过 Raspberry PI 3b+ 的 uart 发送 Raspbian OS。我发现在对 运行d() 调用 240 次左右后,我的数据包中生成了一个额外的字符,这可能导致 1 字节缓冲区超过 运行。因此,如果我的数据包的数据长度为 48 字节,则接收设备将在线路上看到 49 字节,但 t运行smitting 设备看不到。我已通过接收设备上的内存分析验证在线路上发送了 1 个额外字节。
有没有人 运行 遇到这样的情况:如果 write() 系统调用在写入 /dev/serial0 设备套接字时被中断,raspberry pi 将发送不正确的字节?
我最好的猜测是当 Linux 中的伪 运行dom 号码池在该系统调用期间通过内核刷新时。当我 t运行smit 一个 48 字节固定数据的数据数组时,我没有得到那个虚假字节 t运行smission,但是如果我自动生成 .[=18,我会得到那个虚假字节=]
我写的程序有2个pthreads。一个用于生成数据包,另一个用于 t运行 发送它们。 t运行smitting 线程在 t=0 时自动休眠,因为没有数据要发送。生成数据的线程会在生成 4 个数据包后唤醒 t运行smitting 线程。即使我在每次 t运行smission 后将 t运行smitting 线程休眠 1 秒,我也可以定期重现此问题。
我试图减少未使用的代码,但它仍然超过 300 行。这是一个常规问题,现在它和我玩猫捉老鼠。
To Build:
gcc -pthread -g -o exp exp.c
#include <unistd.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <fcntl.h>
#include <termios.h>
#include <errno.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <time.h>
#include <assert.h>
#define SPORT "//dev//serial0"
#define NETPAYLOADSZ 256
#define DATPOOLSZ 16
typedef union __attribute__((packed)) PACKET
{
struct
{
uint8_t SID;
uint8_t DID;
uint8_t LEN;
uint8_t SPT;
uint8_t DPT;
uint8_t TID;
uint8_t DAT[249];//very last byte is the checksum
};
uint8_t packet[NETPAYLOADSZ];
}uPacket;
uPacket *DataPool[DATPOOLSZ];
enum THRDSTAT { eNOTRUNNING, eRUNNING, eSLEEPING, eSTOPPED };
struct SerRes
{
int sigint;
int serialFD;
int datInPoolIdx; //writen by GenDat, read by SerTx
int datOutPoolIdx;//written by SerTx
enum THRDSTAT statusGen;
enum THRDSTAT statusTX;
uint8_t dataAvail;
pthread_mutex_t sermut;
pthread_mutex_t genSleepmux;
pthread_mutex_t serTxSleepmux;
pthread_cond_t genDatSleep;
pthread_cond_t serTxSleep;
};
struct SerRes serres;
uPacket stDatPool[DATPOOLSZ];
void SIGHandler(int num)
{
struct termios options;
if(serres.sigint == 1)
{
close(serres.serialFD);
serres.statusGen = eSTOPPED;
serres.statusTX = eSTOPPED;
exit(-1);
return;
}
tcgetattr(serres.serialFD, &options);
options.c_cc[VTIME] = 1; //timeout of 100ms
options.c_cc[VMIN] = 0; //1 receive atleast 1 character
tcsetattr(serres.serialFD, TCSANOW, &options);
serres.sigint = 1;
}
void Init(char *serpath)
{
struct termios options;
memset(&serres, 0, sizeof(struct SerRes));
memset(&DataPool, 0, sizeof(uPacket*)*DATPOOLSZ);
pthread_mutex_init(&serres.sermut, NULL);
pthread_mutex_init(&serres.genSleepmux, NULL);
pthread_cond_init(&serres.genDatSleep, NULL);
pthread_cond_init(&serres.serTxSleep, NULL);
serres.serialFD = open(serpath, O_RDWR | O_NOCTTY | O_NDELAY);
if(serres.serialFD < 0)
{
printf("\nError no is: %d", errno);
printf("\nError description: %s\n", strerror(errno));
exit(-1);
}
cfmakeraw(&options);
tcgetattr(serres.serialFD, &options);
options.c_cflag = CS8 | CLOCAL | CREAD;
options.c_lflag &= ~(ICANON | ECHO | ECHOE | ISIG );
options.c_cc[VTIME] = 10; //timeout of 100ms
options.c_cc[VMIN] = 1; //1 receive atleast 1 character
cfsetspeed(&options, B115200); //set both input and output speed
tcsetattr(serres.serialFD, TCSANOW, &options);
}
void Deinit(void)
{
close(serres.serialFD);
pthread_mutex_destroy(&serres.sermut);
pthread_mutex_destroy(&serres.genSleepmux);
pthread_cond_destroy(&serres.genDatSleep);
}
void *DatGen(void *arg)
{
int randDev = 0;
uint8_t idx;
uint8_t cksum = 0;
uint8_t buffer[48] = {0x91, 0x39, 0x97, 0xb9, 0x50, 0x9b, 0x7a, 0x33, 0xe3, 0x1, 0xfa, 0x82, 0x61, 0xbd, 0xec, 0x1,\
0x8, 0x5, 0xd, 0x9c, 0x27, 0xcc, 0x4e, 0x8e, 0x63, 0x48, 0x37, 0x3b, 0x66, 0xde, 0x48, 0x77,\
0x98, 0xdf, 0x31, 0x68, 0xfa, 0x2b, 0x9b, 0x5f, 0x2c, 0x96, 0xe1, 0xd, 0x54, 0x4f, 0xf, 0x5c};
pid_t tid = (pid_t)syscall(__NR_gettid);
printf("\nDatGen - %d: Starting: \r\n", tid);
serres.statusGen = eRUNNING;
srand(0x089FFEE4);
while(serres.sigint == 0 && serres.statusGen != eSTOPPED)
{
//Sleep Condition
pthread_mutex_lock(&serres.genSleepmux);
if((serres.dataAvail == DATPOOLSZ))
{
printf("DatGen - %d: No more Data to Generate: Sleeping - %d\r\n", tid, serres.dataAvail );
serres.statusGen = eSLEEPING;
while(serres.dataAvail == DATPOOLSZ) //Gaurd against spurious wake up events
{
pthread_cond_wait(&serres.genDatSleep, &serres.genSleepmux);
}
printf("Datgen - %d: ******** Wokeup, running\r\n\n", tid);
if(serres.statusTX == eSTOPPED)
break;
serres.statusGen = eRUNNING;
}
pthread_mutex_unlock(&serres.genSleepmux);
//Time to wake up the SerTX thread? Maybe?
if(serres.dataAvail > 3 && serres.statusTX == eSLEEPING)
{
pthread_mutex_lock(&serres.serTxSleepmux);
pthread_cond_signal(&serres.serTxSleep);
pthread_mutex_unlock(&serres.serTxSleepmux);
}
//Generate the Packets.
idx = serres.datInPoolIdx;
serres.datInPoolIdx = (serres.datInPoolIdx + 1) & (DATPOOLSZ - 1);
assert(serres.datInPoolIdx < DATPOOLSZ);
stDatPool[idx].SID = 0x55;
stDatPool[idx].DID = 0xAA;
stDatPool[idx].LEN = 0x30; //(rand() % 100);
stDatPool[idx].SPT = 0xEE;
stDatPool[idx].DPT = 0x22;
stDatPool[idx].TID = 0x77;
for(int i = 0; i < stDatPool[idx].LEN ; i++)
{
stDatPool[idx].DAT[i] = rand() % 100; //Only Write
cksum += stDatPool[idx].DAT[i];
}
stDatPool[idx].LEN += 7;
cksum += stDatPool[idx].SID + stDatPool[idx].DID + stDatPool[idx].SPT + stDatPool[idx].LEN;
cksum += stDatPool[idx].DPT + stDatPool[idx].TID;
stDatPool[idx].packet[stDatPool[idx].LEN-1] = 0xFF;
/********* Critical Shared Section *************/
pthread_mutex_lock(&serres.sermut);
serres.dataAvail++; //Touched by both threads...
assert(serres.dataAvail < DATPOOLSZ+1);
if(serres.dataAvail == DATPOOLSZ)
{
printf("Max Dat Reached\r\n");
}
pthread_mutex_unlock(&serres.sermut);
/*************************************************/
}
serres.statusGen = eSTOPPED;
pthread_exit(&serres.sigint);
}
#define LOOPCOUNT 8
void *SerTx(void *arg)
{
pid_t tid = (pid_t)syscall(__NR_gettid);
uint8_t idx = 0;
uint16_t randel = 0;
uint8_t count = 0;
uint8_t bytesSent = 0;
serres.statusTX = eRUNNING;
while(serres.sigint == 0 && serres.statusTX != eSTOPPED && count < LOOPCOUNT)
{
//Sleep Condition
pthread_mutex_lock(&serres.genSleepmux);
if(serres.dataAvail < 1)
{
pthread_cond_signal(&serres.genDatSleep);
printf("SerTx - %d: All Data Consumed\r\n", tid);
serres.statusTX = eSLEEPING;
while(serres.dataAvail < 1) //Gaurd against spurious wakeup events.
{
pthread_cond_wait(&serres.serTxSleep, &serres.genSleepmux);
}
serres.statusTX = eRUNNING;
printf("SerTx - %d: ^^^^^^^ Woke up Running\r\n", tid);
}
pthread_mutex_unlock(&serres.genSleepmux);
//Output
idx = serres.datOutPoolIdx;
serres.datOutPoolIdx = (serres.datOutPoolIdx + 1) & (DATPOOLSZ - 1);
bytesSent = write(serres.serialFD, &stDatPool[idx].packet[0], stDatPool[idx].LEN); //only Read
if(stDatPool[idx].LEN != bytesSent) //did we not send all the bytes?
{
printf("Pkt Len: %x\nBytesSent: %x\r\n", stDatPool[idx].LEN, bytesSent);
assert(0);
}
printf("Consume: %x\r\n", stDatPool[idx].LEN);
/********* Critical Shared Section *************/
pthread_mutex_lock(&serres.sermut);
serres.dataAvail--; //shared write
assert(serres.dataAvail < DATPOOLSZ); //unsigned, so if it goes negative, it goes BIG!!
pthread_mutex_unlock(&serres.sermut);
/*************************************************/
//usleep(1000000);
//tcflush(serres.serialFD, TCIOFLUSH);
count++;
}
serres.statusTX = eSTOPPED;
pthread_cond_signal(&serres.genDatSleep);
pthread_exit(&serres.sigint);
}
/*
* pthread DatGen generates the data for serTx
* pthread SerTx consumes the data generated by DatGen and sends out the serial port
*/
int main(int argc, char **argv)
{
pthread_t datGen, serRx, serTx;
pid_t pid = getpid();
int thrdstat = 0;
Init(SPORT);
signal(SIGINT, SIGHandler);
pthread_create(&datGen, NULL, DatGen, NULL);
pthread_create(&serTx, NULL, SerTx, NULL);
//Wait for all the threads to close
pthread_join(datGen,NULL);
pthread_join(serTx,NULL);
Deinit();
printf("\n>>>> End <<<<< %d\r\n", pid);
return 0;
}
This is generated and subsequently transmitted in bulk
packet =
0x55, 0xaa, 0x37, 0xee, 0x22, 0x77, 0x4c, 0xbf, 0x8 , 0xad,
0xeb, 0xc9, 0xa, 0xb2, 0x1d, 0x45, 0x57, 0x48, 0xc0, 0xc1,
0xa3, 0x0, 0xb4, 0x73, 0x91, 0x8b, 0x28, 0x17, 0x3 , 0x40,
0x62, 0x48, 0x86, 0xc7, 0x9e, 0x60, 0xc2, 0xea, 0x20, 0xca,
0x98, 0x8c, 0x94, 0x22, 0xbe, 0x32, 0x67, 0x96, 0xf9, 0x28,
0xd7, 0x1d, 0xa7, 0x8c, 0xff
This is received by the device.
packet =
0x55, 0xaa, 0x37, 0xee, 0x22, 0x77, 0x4c, 0xbf, 0x8 , 0xad,
0xeb, 0xc9, 0xd, 0xa, 0xb2, 0x1d, 0x45, 0x57, 0x48, 0xc0,
0xc1, 0xa3, 0x0, 0xb4, 0x73, 0x91, 0x8b, 0x28, 0x17, 0x3,
0x40, 0x62, 0x48, 0x86, 0xc7, 0x9e, 0x60, 0xc2, 0xea, 0x20,
0xca, 0x98, 0x8c, 0x94, 0x22, 0xbe, 0x32, 0x67, 0x96, 0xf9,
0x28, 0xd7, 0x1d, 0xa7, 0x8c
字节 22(0 索引)不正确。 0xD 在接收中代替 0xC9 和最后一个字节,0xFF 未被接收器正确接收。我没有提到没有流量控制,这是设计使然。
更新...
所有功劳归功于:Craig Estey
好的,我想我有决心了。我遇到的问题是没有正确设置串行套接字。我发现我的电话是:
cfmakeraw(&options);
在错误的位置。因此,c_oflags 的 OPOST 选项未取消断言。正如下面在答案更新中指出的那样,当内核在我的数据中看到 0xA
时,它会自动发送 0xD
。更正后的代码不再显示此行为。
其次有人指出我的生产者-消费者关系存在Race Condition。我不一定同意分析,但我还是看了看并更改了程序中定义的 full 和 empty 的方式。我想我本来打算走这条路,但由于 pthreads 中的信号丢失,我遇到了线程同步问题……烦人…… pthread 丢失唤醒:Lost wakeups in pthreads
似乎有效的最终代码:
#include <unistd.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <fcntl.h>
#include <termios.h>
#include <errno.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <time.h>
#include <assert.h>
#define SPORT "//dev//serial0"
#define NETPAYLOADSZ 256
typedef union __attribute__((packed)) PACKET
{
struct
{
uint8_t SID;
uint8_t DID;
uint8_t LEN;
uint8_t SPT;
uint8_t DPT;
uint8_t TID;
uint8_t DAT[249];//very last byte is the checksum
};
uint8_t packet[NETPAYLOADSZ];
}uPacket;
enum THRDSTAT { eNOTRUNNING, eRUNNING, eSLEEPING, eSTOPPED };
struct SerRes
{
int sigint;
int serialFD;
uint16_t datInPoolIdx; //writen by GenDat, read by SerTx
uint16_t datOutPoolIdx;//written by SerTx
enum THRDSTAT statusGen;
enum THRDSTAT statusTX;
uint8_t genSig;
uint8_t txSig;
uint8_t StartCond;
uint8_t dataAvail;
pthread_mutex_t sermut;
pthread_mutex_t genSleepmux;
pthread_mutex_t serTxSleepmux;
pthread_cond_t genDatSleep;
pthread_cond_t serTxSleep;
};
#define DATPOOLSZ 16
struct SerRes serres;
uPacket stDatPool[DATPOOLSZ];
void SIGHandler(int num)
{
struct termios options;
if(serres.sigint == 1)
{
close(serres.serialFD);
serres.statusGen = eSTOPPED;
serres.statusTX = eSTOPPED;
exit(-1);
return;
}
tcgetattr(serres.serialFD, &options);
options.c_cc[VTIME] = 1; //timeout of 100ms
options.c_cc[VMIN] = 0; //1 receive atleast 1 character
tcsetattr(serres.serialFD, TCSANOW, &options);
serres.sigint = 1;
}
void Init(char *serpath)
{
struct termios options;
memset(&serres, 0, sizeof(struct SerRes));
memset(&stDatPool, 0, sizeof(uPacket*)*DATPOOLSZ);
//serres.datInPoolIdx = 1; //starting condition
serres.StartCond = 1;
pthread_mutex_init(&serres.sermut, NULL);
pthread_mutex_init(&serres.genSleepmux, NULL);
pthread_cond_init(&serres.genDatSleep, NULL);
pthread_cond_init(&serres.serTxSleep, NULL);
serres.serialFD = open(serpath, O_RDWR | O_NOCTTY | O_NDELAY);
if(serres.serialFD < 0)
{
printf("\nError no is: %d", errno);
printf("\nError description: %s\n", strerror(errno));
exit(-1);
}
tcgetattr(serres.serialFD, &options);
cfmakeraw(&options);
options.c_cflag = CS8 | CLOCAL | CREAD;
options.c_lflag &= ~(ICANON | ECHO | ECHOE | ISIG );
options.c_oflag &= ~OPOST;
options.c_cc[VTIME] = 10; //timeout of 100ms
options.c_cc[VMIN] = 1; //1 receive atleast 1 character
cfsetspeed(&options, B115200); //set both input and output speed
tcsetattr(serres.serialFD, TCSANOW, &options);
}
void Deinit(void)
{
close(serres.serialFD);
pthread_mutex_destroy(&serres.sermut);
pthread_mutex_destroy(&serres.genSleepmux);
pthread_cond_destroy(&serres.genDatSleep);
}
void *DatGen(void *arg)
{
int randDev = 0;
uint8_t idx;
uint8_t cksum = 0;
pid_t tid = (pid_t)syscall(__NR_gettid);
printf("\nDatGen - %d: Starting: \r\n", tid);
serres.statusGen = eRUNNING;
srand(0x089FFEE4);
serres.datInPoolIdx = 1;// (serres.datInPoolIdx + 1) & (DATPOOLSZ - 1);
while(serres.sigint == 0 && serres.statusGen != eSTOPPED)
{
//Sleep Condition
pthread_mutex_lock(&serres.genSleepmux);
if(serres.datInPoolIdx == serres.datOutPoolIdx) //full condition
{
printf("DatGen - %d: Sleeping - %d\r\n", tid, serres.dataAvail );
serres.statusGen = eSLEEPING;
serres.StartCond = 0;
if(serres.statusTX == eSLEEPING)
{
serres.txSig = 1;
pthread_cond_signal(&serres.serTxSleep);
}
while((serres.datInPoolIdx == serres.datOutPoolIdx) && (serres.genSig == 0)) //Gaurd against spurious wake up events
{
serres.genSig = 0;
pthread_cond_wait(&serres.genDatSleep, &serres.genSleepmux);
}
serres.genSig = 0;
printf("Datgen - %d: Wokeup\r\n", tid);
if(serres.statusTX == eSTOPPED)
break;
serres.statusGen = eRUNNING;
}
idx = serres.datInPoolIdx;
assert(idx < DATPOOLSZ);
serres.datInPoolIdx = (serres.datInPoolIdx + 1) & (DATPOOLSZ - 1);
pthread_mutex_unlock(&serres.genSleepmux);
//Time to wake up the SerTX thread? Maybe?
//Generate the Packets.
stDatPool[idx].SID = 0x55;
stDatPool[idx].DID = 0xAA;
stDatPool[idx].LEN = 0x30; //(rand() % 100);
stDatPool[idx].SPT = 0xEE;
stDatPool[idx].DPT = 0x22;
stDatPool[idx].TID = 0x77;
for(int i = 0; i < stDatPool[idx].LEN ; i++)
{
stDatPool[idx].DAT[i] = i; //rand() % 100; //Only Write
cksum += stDatPool[idx].DAT[i];
}
stDatPool[idx].LEN += 7;
cksum += stDatPool[idx].SID + stDatPool[idx].DID + stDatPool[idx].SPT + stDatPool[idx].LEN;
cksum += stDatPool[idx].DPT + stDatPool[idx].TID;
stDatPool[idx].packet[stDatPool[idx].LEN-1] = 0xFF;
/********* Critical Shared Section *************/
pthread_mutex_lock(&serres.sermut);
serres.dataAvail++; //Touched by both threads...
pthread_mutex_unlock(&serres.sermut);
/*************************************************/
}
serres.statusGen = eSTOPPED;
pthread_exit(&serres.sigint);
}
#define LOOPCOUNT 8
void *SerTx(void *arg)
{
pid_t tid = (pid_t)syscall(__NR_gettid);
uint8_t idx = 0;
uint16_t randel = 0;
uint32_t count = 0;
uint8_t bytesSent = 0;
serres.statusTX = eRUNNING;
printf("SerTx - %d: Starting\r\n", tid);
while(serres.sigint == 0 && serres.statusTX != eSTOPPED)// && count < LOOPCOUNT)
{
//Sleep Condition
pthread_mutex_lock(&serres.genSleepmux);
serres.datOutPoolIdx = (serres.datOutPoolIdx + 1) & (DATPOOLSZ -1);
if((serres.datOutPoolIdx == serres.datInPoolIdx)) //Empty Condition, sleep on first start.
{
if(serres.statusGen == eSLEEPING)
{
printf("Wake GenDat\r\n");
pthread_cond_signal(&serres.genDatSleep);
serres.genSig = 1;
}
printf("SerTx - %d: Sleep\r\n", tid);
serres.statusTX = eSLEEPING;
while((serres.datOutPoolIdx == serres.datInPoolIdx) && serres.txSig == 0) //Gaurd against spurious wakeup events.
{
serres.txSig = 0;
pthread_cond_wait(&serres.serTxSleep, &serres.genSleepmux);
}
serres.txSig = 0;
serres.statusTX = eRUNNING;
printf("SerTx - %d: Running\r\n", tid);
}
idx = serres.datOutPoolIdx;
assert(idx < DATPOOLSZ);
pthread_mutex_unlock(&serres.genSleepmux);
//Output
if(stDatPool[idx].SID != 0x55)
assert(stDatPool[idx].SID == 0x55);
if(stDatPool[idx].DID != 0xAA)
assert(stDatPool[idx].DID != 0xAA);
if(stDatPool[idx].LEN != 0x37) //(rand() % 100);
assert(stDatPool[idx].LEN == 0x37);
if(stDatPool[idx].SPT != 0xEE)
assert(stDatPool[idx].SPT == 0xEE);
if(stDatPool[idx].DPT != 0x22)
assert(stDatPool[idx].DPT == 0x22);
if(stDatPool[idx].TID != 0x77)
assert(stDatPool[idx].TID == 0x77);
for(int i = 0; i < (stDatPool[idx].LEN-7); i++)
{
assert(stDatPool[idx].DAT[i] == i);
}
if(stDatPool[idx].packet[stDatPool[idx].LEN-1] != 0xFF)
assert(stDatPool[idx].packet[stDatPool[idx].LEN-1] == 0xFF);
/* bytesSent = write(serres.serialFD, &stDatPool[idx].packet[0], stDatPool[idx].LEN); //only Read
if(stDatPool[idx].LEN != bytesSent) //did we not send all the bytes?
{
printf("Pkt Len: %x\nBytesSent: %x\r\n", stDatPool[idx].LEN, bytesSent);
assert(0);
}
*/
//printf("Consume: %d\r\n", stDatPool[idx].LEN);
/********* Critical Shared Section *************/
pthread_mutex_lock(&serres.sermut);
memset(&stDatPool[idx], 0, sizeof(stDatPool[idx]));
serres.dataAvail--; //shared write
pthread_mutex_unlock(&serres.sermut);
/*************************************************/
//usleep(1000000);
//tcflush(serres.serialFD, TCIOFLUSH);
count++;
}
serres.statusTX = eSTOPPED;
pthread_cond_signal(&serres.genDatSleep);
pthread_exit(&serres.sigint);
}
/*
* pthread DatGen generates the data for serTx
* pthread SerTx consumes the data generated by DatGen and sends out the serial port
*/
int main(int argc, char **argv)
{
pthread_t datGen, serRx, serTx;
pid_t pid = getpid();
int thrdstat = 0;
Init(SPORT);
signal(SIGINT, SIGHandler);
pthread_create(&datGen, NULL, DatGen, NULL);
pthread_create(&serTx, NULL, SerTx, NULL);
//Wait for all the threads to close
pthread_join(datGen,NULL);
while(serres.StartCond == 1);
pthread_join(serTx,NULL);
Deinit();
printf("\n>>>> End <<<<< %d\r\n", pid);
return 0;
}
编辑: 感谢您编辑您的问题并发布您生成的数据和相应的 实际 在远程接收的数据 设备.
您的问题[和解决方案]简单得多。请参阅下面的 UPDATE #2
部分。
您正在访问您的环队列索引变量(例如 datInPoolIdx
和 datOutPoolIdx
)在 锁定区域之外。
我很钦佩你为此付出的所有努力。但是,我认为你有点太复杂了。
你真的只需要在锁下访问索引变量。
松散地...
如果环形队列为空,Tx 线程应该休眠:
enqidx == deqidx
如果环队列已满,生成线程应该休眠:
((enqidx + 1) % DATPOOLSIZ) == deqidx
pthread_cond_wait/pthread_cond_signal
条件应基于使用类似上述方法对这些值的比较。
这是一些伪代码,大致基于您所拥有的。它没有任何条件变量,但我想您会了解如何在需要时添加它。
这些函数一次只能处理一个字节。但是,有一种方法可以修改它们,因此它们会产生 连续 字节的长度和数量,无论是免费的还是可用的,因此您可以使用 memcpy
移动一堆字节 into/out 批量队列。
int
queue_wrap_idx(int idx)
{
idx += 1;
idx %= DATPOOLSIZ;
return idx;
}
// gen_avail -- space available in queue
// RETURNS: index of place to store (or -1=full)
int
gen_avail(void)
{
lock();
int idxenq = datInPoolIdx;
int idxdeq = datOutPoolIdx;
unlock();
int idxfull = queue_wrap_idx(idxenq);
if (idxfull == idxdeq)
idxenq = -1;
return idxenq;
}
// gen_advance -- advance generator queue index
void
gen_advance(void)
{
lock();
int idxenq = datInPoolIdx;
idxenq = queue_wrap_idx(idxenq);
datInPoolIdx = idxenq;
unlock();
}
// tx_avail -- data available in queue
// RETURNS: index of place to dequeue (or -1=empty)
int
tx_avail(void)
{
lock();
int idxenq = datInPoolIdx;
int idxdeq = datOutPoolIdx;
unlock();
if (idxdeq == idxenq)
idxdeq = -1;
return idxdeq;
}
// tx_advance -- advance transmitter queue index
void
tx_advance(void)
{
lock();
int idxdeq = datOutPoolIdx;
idxdeq = queue_wrap_idx(idxdeq);
datOutPoolIdx = idxdeq;
unlock();
}
// gen_thread -- data generator thread
void
gen_thread(void *ptr)
{
while (1) {
int idxenq = gen_avail();
if (idxenq >= 0) {
DAT[idxenq] = rand();
gen_advance();
}
}
}
// tx_thread -- serial port transmit thread
void
tx_thread(void *ptr)
{
while (1) {
int idxdeq = tx_avail();
if (idxdeq >= 0) {
char datval = DAT[idxdeq];
tx_advance();
write(serport,&datval,1);
}
}
}
更新:
I can certainly change it, no biggie, but I don't think its the problem I'm experiencing.
正如我所提到的,您肯定存在竞争条件。 Unfixed/latent 竞争条件 出现 非常像故障 H/W。在解决这些问题之前,您无法进一步推测。
If I was overwriting the transmitted memory region via the GenDat thread then I would expect to see fragments on the receiving side. What I'm seeing is a single incorrect byte that is affecting my packaging of data.
构建诊断模式[或两个] ...
只需让 GenThread 发送一个 递增 字节流(相对于 rand
/其他)。在这种模式下,完全跳过 UART TX。 TxThread 很容易注意到间隙[因为它应该看到序列 0x00-0xFF
无限重复]。
这将 运行 线程以更快的速度运行,并且更有可能出现竞争条件。
删除所有[调试] printf
。他们有可能扰乱时间的锁,所以你 不是 测量你的 [真实] 系统,而是 printf
的系统。 printf
调用 慢 并且一团糟。
您可以添加随机 nanosleep
调用以进一步强调设置。要有创意。使用原力的阴暗面来创建比真实系统所经历的多的测试。
您可以让 GenThread 故意 发送乱序字节以验证 TxThread 可以检测到间隙。
当您完成所有这些工作后,运行 一天左右的诊断模式,看看会发生什么。根据我的经验,您通常会在几分钟到一个小时内看到一些东西。当我进行测试时,我会 运行 作为验收测试过夜。
It looks like the uart TX hardware is glitching, is it hardware? LOL yes!!(i'm software) but its most definitely software...somewhere.
嗯……不太可能。我有直接 [commercial/product] 在 RPi 上使用 UART TX 的经验。
发送方系统和接收方系统之间的 RTS/CTS 设置可能不一致。或者,Tx/Rx 时钟频率/波特率略有偏差。
远程系统可能超过运行缓冲区(即)它跟不上[突发]数据速率。
接收器处理数据的速度可能很慢。您应该使用 hires 时间戳(例如 clock_gettime(CLOCK_MONOTONIC,...)
来标记字节到达等
我为此做的是有一个记录“事件”类型和时间戳的“跟踪缓冲区”。不幸的是 [对你 ;-)],我使用环形队列记录这些事件 [使用 stdatomic.h
函数,例如 atomic_compare_exchange_strong
] 到 add/remove 跟踪条目。
因此,您需要一个 solid 多线程环形队列实现来保存跟踪条目 [这里是先有鸡还是先有蛋的问题]。如果接收器类似于 FreeRTOS
,您将不得不处理 cli/sti
和其他裸机注意事项。
更新#2:
我写了一个小的 perl 脚本来分析和比较你的数据。这种分析也可以通过创建两个具有两位十六进制值的文件来完成,每行一个。这是 diff -u
输出:
--- tx.txt 2021-02-23 10:38:22.295135431 -0500
+++ rx.txt 2021-02-23 10:38:22.295135431 -0500
@@ -10,6 +10,7 @@
AD
EB
C9
+0D
0A
B2
1D
@@ -52,4 +53,3 @@
1D
A7
8C
-FF
设备接收到的数据与生成的序列相同除了生成的数据字节是:
0A
接收方得到:
0D 0A
主机 内核 TTY 层在看到 <LF>
.
<CR><LF>
这是因为当您设置 termios
参数时,您没有正确设置“原始”模式。
具体来说,您没有禁用“实现定义的输出处理”[根据 man termios
中的示例]。
即在Init
中需要添加:
options.c_oflag &= ~OPOST;