在 Raspberry Pi 3B+ 的 serial0 上输出时,当写入系统调用通过其他内核进程中断时,会传输一个虚假字节

When outputting on serial0 of the Rasberry Pi 3B+ a spurious byte is transmitted when write syscall is interrupted via other kernel processes

我 运行 我正在生成 运行dom 数据包,以便 运行 通过 Raspberry PI 3b+ 的 uart 发送 Raspbian OS。我发现在对 运行d() 调用 240 次左右后,我的数据包中生成了一个额外的字符,这可能导致 1 字节缓冲区超过 运行。因此,如果我的数据包的数据长度为 48 字节,则接收设备将在线路上看到 49 字节,但 t运行smitting 设备看不到。我已通过接收设备上的内存分析验证在线路上发送了 1 个额外字节。

有没有人 运行 遇到这样的情况:如果 write() 系统调用在写入 /dev/serial0 设备套接字时被中断,raspberry pi 将发送不正确的字节?

我最好的猜测是当 Linux 中的伪 运行dom 号码池在该系统调用期间通过内核刷新时。当我 t运行smit 一个 48 字节固定数据的数据数组时,我没有得到那个虚假字节 t运行smission,但是如果我自动生成 .[=18,我会得到那个虚假字节=]

我写的程序有2个pthreads。一个用于生成数据包,另一个用于 t运行 发送它们。 t运行smitting 线程在 t=0 时自动休眠,因为没有数据要发送。生成数据的线程会在生成 4 个数据包后唤醒 t运行smitting 线程。即使我在每次 t运行smission 后将 t运行smitting 线程休眠 1 秒,我也可以定期重现此问题。

我试图减少未使用的代码,但它仍然超过 300 行。这是一个常规问题,现在它和我玩猫捉老鼠。

To Build:
gcc -pthread -g -o exp exp.c
#include <unistd.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <fcntl.h>
#include <termios.h>
#include <errno.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <time.h>

#include <assert.h>


#define SPORT "//dev//serial0"
#define NETPAYLOADSZ 256

#define DATPOOLSZ 16


typedef union __attribute__((packed)) PACKET
{
    struct
    {
        uint8_t SID;
        uint8_t DID;
        uint8_t LEN;
        uint8_t SPT;
        uint8_t DPT;
        uint8_t TID;
        uint8_t DAT[249];//very last byte is the checksum
    };
    uint8_t packet[NETPAYLOADSZ];
}uPacket;

uPacket *DataPool[DATPOOLSZ];
enum THRDSTAT { eNOTRUNNING, eRUNNING, eSLEEPING, eSTOPPED };

struct SerRes
{
        int sigint;
        int serialFD;
        int datInPoolIdx; //writen by GenDat, read by SerTx
        int datOutPoolIdx;//written by SerTx
        enum THRDSTAT statusGen;
        enum THRDSTAT statusTX;
        uint8_t dataAvail;

        pthread_mutex_t sermut;
        pthread_mutex_t genSleepmux;
        pthread_mutex_t serTxSleepmux;

        pthread_cond_t genDatSleep;
        pthread_cond_t serTxSleep;
};

struct SerRes serres;
uPacket stDatPool[DATPOOLSZ];

void SIGHandler(int num)
{
    struct termios options;

    if(serres.sigint == 1)
    {
        close(serres.serialFD);
        serres.statusGen = eSTOPPED;
        serres.statusTX = eSTOPPED;
        exit(-1);
        return;
    }
    tcgetattr(serres.serialFD, &options);
    options.c_cc[VTIME] = 1; //timeout of 100ms
    options.c_cc[VMIN]  = 0; //1 receive atleast 1 character

    tcsetattr(serres.serialFD, TCSANOW, &options);

    serres.sigint = 1;
}

void Init(char *serpath)
{
    struct termios options;

    memset(&serres, 0, sizeof(struct SerRes));
    memset(&DataPool, 0, sizeof(uPacket*)*DATPOOLSZ);

    pthread_mutex_init(&serres.sermut, NULL);
    pthread_mutex_init(&serres.genSleepmux, NULL);

    pthread_cond_init(&serres.genDatSleep, NULL);
    pthread_cond_init(&serres.serTxSleep, NULL);


    serres.serialFD = open(serpath, O_RDWR | O_NOCTTY | O_NDELAY);
    if(serres.serialFD < 0)
    {
        printf("\nError no is: %d", errno);
        printf("\nError description: %s\n", strerror(errno));
        exit(-1);
    }

    cfmakeraw(&options);
    tcgetattr(serres.serialFD, &options);
    options.c_cflag = CS8 | CLOCAL | CREAD;
    options.c_lflag &= ~(ICANON | ECHO | ECHOE | ISIG );
    options.c_cc[VTIME] = 10; //timeout of 100ms
    options.c_cc[VMIN]  = 1; //1 receive atleast 1 character

    cfsetspeed(&options, B115200); //set both input and output speed

    tcsetattr(serres.serialFD, TCSANOW, &options);
}

void Deinit(void)
{
    close(serres.serialFD);
    pthread_mutex_destroy(&serres.sermut);
    pthread_mutex_destroy(&serres.genSleepmux);
    pthread_cond_destroy(&serres.genDatSleep);
}


void *DatGen(void *arg)
{
    int randDev = 0;
    uint8_t idx;
    uint8_t cksum = 0;

    uint8_t buffer[48] = {0x91, 0x39, 0x97, 0xb9, 0x50, 0x9b, 0x7a, 0x33, 0xe3, 0x1, 0xfa, 0x82, 0x61, 0xbd, 0xec, 0x1,\
                      0x8,  0x5,  0xd,  0x9c, 0x27, 0xcc, 0x4e, 0x8e, 0x63, 0x48, 0x37, 0x3b, 0x66, 0xde, 0x48, 0x77,\
                      0x98, 0xdf, 0x31, 0x68, 0xfa, 0x2b, 0x9b, 0x5f, 0x2c, 0x96, 0xe1, 0xd,  0x54, 0x4f, 0xf,  0x5c};

    pid_t tid = (pid_t)syscall(__NR_gettid);


    printf("\nDatGen - %d: Starting: \r\n", tid);
    serres.statusGen = eRUNNING;
    srand(0x089FFEE4);

    while(serres.sigint == 0 && serres.statusGen != eSTOPPED)
    {

        //Sleep Condition
        pthread_mutex_lock(&serres.genSleepmux);
        if((serres.dataAvail == DATPOOLSZ))
        {
            printf("DatGen - %d: No more Data to Generate: Sleeping - %d\r\n", tid, serres.dataAvail );
            serres.statusGen = eSLEEPING;
            while(serres.dataAvail == DATPOOLSZ) //Gaurd against spurious wake up events
            {
                    pthread_cond_wait(&serres.genDatSleep, &serres.genSleepmux);
            }
            printf("Datgen - %d: ******** Wokeup, running\r\n\n", tid);

            if(serres.statusTX == eSTOPPED)
                    break;

            serres.statusGen = eRUNNING;
        }
        pthread_mutex_unlock(&serres.genSleepmux);

        //Time to wake up the SerTX thread? Maybe?
        if(serres.dataAvail > 3 && serres.statusTX == eSLEEPING)
        {
                    pthread_mutex_lock(&serres.serTxSleepmux);
                    pthread_cond_signal(&serres.serTxSleep);
                    pthread_mutex_unlock(&serres.serTxSleepmux);
        }

        //Generate the Packets.
        idx = serres.datInPoolIdx;

        serres.datInPoolIdx = (serres.datInPoolIdx + 1) & (DATPOOLSZ - 1);
        assert(serres.datInPoolIdx < DATPOOLSZ);
        stDatPool[idx].SID = 0x55;
        stDatPool[idx].DID = 0xAA;
        stDatPool[idx].LEN = 0x30; //(rand() % 100);
        stDatPool[idx].SPT = 0xEE;
        stDatPool[idx].DPT = 0x22;
        stDatPool[idx].TID = 0x77;


        for(int i = 0; i < stDatPool[idx].LEN ; i++)
        {
                stDatPool[idx].DAT[i] = rand() % 100; //Only Write
                cksum += stDatPool[idx].DAT[i];
        }

        stDatPool[idx].LEN += 7;

        cksum += stDatPool[idx].SID + stDatPool[idx].DID + stDatPool[idx].SPT + stDatPool[idx].LEN;
        cksum += stDatPool[idx].DPT + stDatPool[idx].TID;

        stDatPool[idx].packet[stDatPool[idx].LEN-1] = 0xFF;

        /********* Critical Shared Section *************/
        pthread_mutex_lock(&serres.sermut);

                serres.dataAvail++; //Touched by both threads...
                assert(serres.dataAvail < DATPOOLSZ+1);
                if(serres.dataAvail == DATPOOLSZ)
                {
                        printf("Max Dat Reached\r\n");
                }

        pthread_mutex_unlock(&serres.sermut);
        /*************************************************/

    }
    serres.statusGen = eSTOPPED;
    pthread_exit(&serres.sigint);
}

#define LOOPCOUNT 8
void *SerTx(void *arg)
{
        pid_t tid = (pid_t)syscall(__NR_gettid);
        uint8_t idx = 0;
        uint16_t randel = 0;
        uint8_t count = 0;
        uint8_t bytesSent = 0;

        serres.statusTX = eRUNNING;

        while(serres.sigint == 0 && serres.statusTX != eSTOPPED && count < LOOPCOUNT)
        {

            //Sleep Condition
            pthread_mutex_lock(&serres.genSleepmux);
            if(serres.dataAvail < 1)
            {

                pthread_cond_signal(&serres.genDatSleep);
                printf("SerTx - %d: All Data Consumed\r\n", tid);

                serres.statusTX = eSLEEPING;
                while(serres.dataAvail < 1) //Gaurd against spurious wakeup events.
                {
                        pthread_cond_wait(&serres.serTxSleep, &serres.genSleepmux);
                }

                serres.statusTX = eRUNNING;
                printf("SerTx - %d: ^^^^^^^ Woke up Running\r\n", tid);
            }
            pthread_mutex_unlock(&serres.genSleepmux);

            //Output
            idx = serres.datOutPoolIdx;
            serres.datOutPoolIdx = (serres.datOutPoolIdx + 1) & (DATPOOLSZ - 1);

            bytesSent = write(serres.serialFD, &stDatPool[idx].packet[0], stDatPool[idx].LEN); //only Read

            if(stDatPool[idx].LEN != bytesSent) //did we not send all the bytes?
            {
                printf("Pkt Len: %x\nBytesSent: %x\r\n", stDatPool[idx].LEN, bytesSent);
                assert(0);
            }

            printf("Consume: %x\r\n", stDatPool[idx].LEN);

            /********* Critical Shared Section *************/
            pthread_mutex_lock(&serres.sermut);

                serres.dataAvail--; //shared write
                assert(serres.dataAvail < DATPOOLSZ); //unsigned, so if it goes negative, it goes BIG!!

            pthread_mutex_unlock(&serres.sermut);
            /*************************************************/

            //usleep(1000000);
            //tcflush(serres.serialFD, TCIOFLUSH);
            count++;
        }
        serres.statusTX = eSTOPPED;
        pthread_cond_signal(&serres.genDatSleep);
        pthread_exit(&serres.sigint);
}

/*
 *  pthread DatGen generates the data for serTx
 *  pthread SerTx consumes the data generated by DatGen and sends out the serial port
 */
int main(int argc, char **argv)
{
    pthread_t datGen, serRx, serTx;
    pid_t pid = getpid();
    int thrdstat = 0;


    Init(SPORT);
    signal(SIGINT, SIGHandler);
    pthread_create(&datGen, NULL, DatGen, NULL);
    pthread_create(&serTx,  NULL, SerTx,  NULL);


    //Wait for all the threads to close
    pthread_join(datGen,NULL);
    pthread_join(serTx,NULL);

    Deinit();
    printf("\n>>>> End <<<<< %d\r\n", pid);
    return 0;
}
This is generated and subsequently transmitted in bulk
packet = 
0x55, 0xaa, 0x37, 0xee, 0x22, 0x77, 0x4c, 0xbf, 0x8 , 0xad, 
0xeb, 0xc9, 0xa,  0xb2, 0x1d, 0x45, 0x57, 0x48, 0xc0, 0xc1, 
0xa3, 0x0,  0xb4, 0x73, 0x91, 0x8b, 0x28, 0x17, 0x3 , 0x40, 
0x62, 0x48, 0x86, 0xc7, 0x9e, 0x60, 0xc2, 0xea, 0x20, 0xca, 
0x98, 0x8c, 0x94, 0x22, 0xbe, 0x32, 0x67, 0x96, 0xf9, 0x28, 
0xd7, 0x1d, 0xa7, 0x8c, 0xff

This is received by the device.
packet = 
0x55, 0xaa, 0x37, 0xee, 0x22, 0x77, 0x4c, 0xbf, 0x8 , 0xad, 
0xeb, 0xc9, 0xd,  0xa,  0xb2, 0x1d, 0x45, 0x57, 0x48, 0xc0,  
0xc1, 0xa3, 0x0,  0xb4, 0x73, 0x91, 0x8b, 0x28, 0x17, 0x3, 
0x40, 0x62, 0x48, 0x86, 0xc7, 0x9e, 0x60, 0xc2, 0xea, 0x20,
0xca, 0x98, 0x8c, 0x94, 0x22, 0xbe, 0x32, 0x67, 0x96, 0xf9, 
0x28, 0xd7, 0x1d, 0xa7, 0x8c

字节 22(0 索引)不正确。 0xD 在接收中代替 0xC9 和最后一个字节,0xFF 未被接收器正确接收。我没有提到没有流量控制,这是设计使然。

更新...

所有功劳归功于:Craig Estey

好的,我想我有决心了。我遇到的问题是没有正确设置串行套接字。我发现我的电话是: cfmakeraw(&options); 在错误的位置。因此,c_oflags 的 OPOST 选项未取消断言。正如下面在答案更新中指出的那样,当内核在我的数据中看到 0xA 时,它会自动发送 0xD。更正后的代码不再显示此行为。

其次有人指出我的生产者-消费者关系存在Race Condition。我不一定同意分析,但我还是看了看并更改了程序中定义的 full 和 empty 的方式。我想我本来打算走这条路,但由于 pthreads 中的信号丢失,我遇到了线程同步问题……烦人…… pthread 丢失唤醒:Lost wakeups in pthreads

似乎有效的最终代码:

#include <unistd.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <fcntl.h>
#include <termios.h>
#include <errno.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <time.h>

#include <assert.h>


#define SPORT "//dev//serial0"
#define NETPAYLOADSZ 256

typedef union __attribute__((packed)) PACKET
{
    struct
    {
        uint8_t SID;
        uint8_t DID;
        uint8_t LEN;
        uint8_t SPT;
        uint8_t DPT;
        uint8_t TID;
        uint8_t DAT[249];//very last byte is the checksum
    };
    uint8_t packet[NETPAYLOADSZ];
}uPacket;

enum THRDSTAT { eNOTRUNNING, eRUNNING, eSLEEPING, eSTOPPED };

struct SerRes
{
    int sigint;
    int serialFD;
    uint16_t datInPoolIdx; //writen by GenDat, read by SerTx
    uint16_t datOutPoolIdx;//written by SerTx
    enum THRDSTAT statusGen;
    enum THRDSTAT statusTX;
    uint8_t genSig;
    uint8_t txSig;
    uint8_t StartCond;
    uint8_t dataAvail;

    pthread_mutex_t sermut;
    pthread_mutex_t genSleepmux;
    pthread_mutex_t serTxSleepmux;

    pthread_cond_t genDatSleep;
    pthread_cond_t serTxSleep;
};

#define DATPOOLSZ 16
struct SerRes serres;
uPacket stDatPool[DATPOOLSZ];

void SIGHandler(int num)
{
    struct termios options;

    if(serres.sigint == 1)
    {
        close(serres.serialFD);
        serres.statusGen = eSTOPPED;
        serres.statusTX = eSTOPPED;
        exit(-1);
        return;
    }
    tcgetattr(serres.serialFD, &options);
    options.c_cc[VTIME] = 1; //timeout of 100ms
    options.c_cc[VMIN]  = 0; //1 receive atleast 1 character

    tcsetattr(serres.serialFD, TCSANOW, &options);

    serres.sigint = 1;
}

void Init(char *serpath)
{
    struct termios options;

    memset(&serres, 0, sizeof(struct SerRes));
    memset(&stDatPool, 0, sizeof(uPacket*)*DATPOOLSZ);

    //serres.datInPoolIdx = 1; //starting condition
    serres.StartCond = 1;
    pthread_mutex_init(&serres.sermut, NULL);
    pthread_mutex_init(&serres.genSleepmux, NULL);

    pthread_cond_init(&serres.genDatSleep, NULL);
    pthread_cond_init(&serres.serTxSleep, NULL);


    serres.serialFD = open(serpath, O_RDWR | O_NOCTTY | O_NDELAY);
    if(serres.serialFD < 0)
    {
        printf("\nError no is: %d", errno);
        printf("\nError description: %s\n", strerror(errno));
        exit(-1);
    }


    tcgetattr(serres.serialFD, &options);
    cfmakeraw(&options);
    options.c_cflag = CS8 | CLOCAL | CREAD;
    options.c_lflag &= ~(ICANON | ECHO | ECHOE | ISIG );
    options.c_oflag &= ~OPOST;
    options.c_cc[VTIME] = 10; //timeout of 100ms
    options.c_cc[VMIN]  = 1; //1 receive atleast 1 character

    cfsetspeed(&options, B115200); //set both input and output speed

    tcsetattr(serres.serialFD, TCSANOW, &options);
}

void Deinit(void)
{
    close(serres.serialFD);
    pthread_mutex_destroy(&serres.sermut);
    pthread_mutex_destroy(&serres.genSleepmux);
    pthread_cond_destroy(&serres.genDatSleep);
}


void *DatGen(void *arg)
{
    int randDev = 0;
    uint8_t idx;
    uint8_t cksum = 0;

    pid_t tid = (pid_t)syscall(__NR_gettid);

    printf("\nDatGen - %d: Starting: \r\n", tid);
    serres.statusGen = eRUNNING;
    srand(0x089FFEE4);

    serres.datInPoolIdx = 1;// (serres.datInPoolIdx + 1) & (DATPOOLSZ - 1);

    while(serres.sigint == 0 && serres.statusGen != eSTOPPED)
    {

        //Sleep Condition
        pthread_mutex_lock(&serres.genSleepmux);
        if(serres.datInPoolIdx == serres.datOutPoolIdx) //full condition
        {
            printf("DatGen - %d: Sleeping - %d\r\n", tid, serres.dataAvail );
            serres.statusGen = eSLEEPING;
            serres.StartCond = 0;
            if(serres.statusTX == eSLEEPING)
            {
                serres.txSig = 1;
                pthread_cond_signal(&serres.serTxSleep);
            }

            while((serres.datInPoolIdx == serres.datOutPoolIdx) && (serres.genSig == 0)) //Gaurd against spurious wake up events
            {
                serres.genSig = 0;
                pthread_cond_wait(&serres.genDatSleep, &serres.genSleepmux);
            }
            serres.genSig = 0;
            printf("Datgen - %d: Wokeup\r\n", tid);

            if(serres.statusTX == eSTOPPED)
                    break;

            serres.statusGen = eRUNNING;
        }

        idx = serres.datInPoolIdx;
        assert(idx < DATPOOLSZ);
        serres.datInPoolIdx = (serres.datInPoolIdx + 1) & (DATPOOLSZ - 1);

        pthread_mutex_unlock(&serres.genSleepmux);

        //Time to wake up the SerTX thread? Maybe?

        //Generate the Packets.
        stDatPool[idx].SID = 0x55;
        stDatPool[idx].DID = 0xAA;
        stDatPool[idx].LEN = 0x30; //(rand() % 100);
        stDatPool[idx].SPT = 0xEE;
        stDatPool[idx].DPT = 0x22;
        stDatPool[idx].TID = 0x77;


        for(int i = 0; i < stDatPool[idx].LEN ; i++)
        {
            stDatPool[idx].DAT[i] = i; //rand() % 100; //Only Write
            cksum += stDatPool[idx].DAT[i];
        }

        stDatPool[idx].LEN += 7;

        cksum += stDatPool[idx].SID + stDatPool[idx].DID + stDatPool[idx].SPT + stDatPool[idx].LEN;
        cksum += stDatPool[idx].DPT + stDatPool[idx].TID;

        stDatPool[idx].packet[stDatPool[idx].LEN-1] = 0xFF;

        /********* Critical Shared Section *************/
        pthread_mutex_lock(&serres.sermut);

            serres.dataAvail++; //Touched by both threads...

        pthread_mutex_unlock(&serres.sermut);
        /*************************************************/

    }
    serres.statusGen = eSTOPPED;
    pthread_exit(&serres.sigint);
}

#define LOOPCOUNT 8
void *SerTx(void *arg)
{
    pid_t tid = (pid_t)syscall(__NR_gettid);
    uint8_t idx = 0;
    uint16_t randel = 0;
    uint32_t count = 0;
    uint8_t bytesSent = 0;

    serres.statusTX = eRUNNING;
    printf("SerTx - %d: Starting\r\n", tid);

    while(serres.sigint == 0 && serres.statusTX != eSTOPPED)// && count < LOOPCOUNT)
    {

        //Sleep Condition
        pthread_mutex_lock(&serres.genSleepmux);
        serres.datOutPoolIdx = (serres.datOutPoolIdx + 1) & (DATPOOLSZ -1);

        if((serres.datOutPoolIdx == serres.datInPoolIdx)) //Empty Condition, sleep on first start.
        {

            if(serres.statusGen == eSLEEPING)
            {
                    printf("Wake GenDat\r\n");
                    pthread_cond_signal(&serres.genDatSleep);
                    serres.genSig = 1;
            }

            printf("SerTx - %d: Sleep\r\n", tid);

            serres.statusTX = eSLEEPING;
            while((serres.datOutPoolIdx == serres.datInPoolIdx) && serres.txSig == 0) //Gaurd against spurious wakeup events.
            {
                    serres.txSig = 0;
                    pthread_cond_wait(&serres.serTxSleep, &serres.genSleepmux);

            }
            serres.txSig = 0;
            serres.statusTX = eRUNNING;
            printf("SerTx - %d: Running\r\n", tid);
        }

        idx = serres.datOutPoolIdx;
        assert(idx < DATPOOLSZ);


        pthread_mutex_unlock(&serres.genSleepmux);

        //Output

        if(stDatPool[idx].SID != 0x55)
                assert(stDatPool[idx].SID == 0x55);

        if(stDatPool[idx].DID != 0xAA)
                assert(stDatPool[idx].DID != 0xAA);

        if(stDatPool[idx].LEN != 0x37) //(rand() % 100);
                assert(stDatPool[idx].LEN == 0x37);

        if(stDatPool[idx].SPT != 0xEE)
                assert(stDatPool[idx].SPT == 0xEE);

        if(stDatPool[idx].DPT != 0x22)
                assert(stDatPool[idx].DPT == 0x22);

        if(stDatPool[idx].TID != 0x77)
                assert(stDatPool[idx].TID == 0x77);

        for(int i = 0; i < (stDatPool[idx].LEN-7); i++)
        {
                assert(stDatPool[idx].DAT[i] == i);
        }

        if(stDatPool[idx].packet[stDatPool[idx].LEN-1] != 0xFF)
                assert(stDatPool[idx].packet[stDatPool[idx].LEN-1] == 0xFF);

        /*  bytesSent = write(serres.serialFD, &stDatPool[idx].packet[0], stDatPool[idx].LEN); //only Read

        if(stDatPool[idx].LEN != bytesSent) //did we not send all the bytes?
        {
            printf("Pkt Len: %x\nBytesSent: %x\r\n", stDatPool[idx].LEN, bytesSent);
            assert(0);
        }
                */
        //printf("Consume: %d\r\n", stDatPool[idx].LEN);

        /********* Critical Shared Section *************/
        pthread_mutex_lock(&serres.sermut);

            memset(&stDatPool[idx], 0, sizeof(stDatPool[idx]));
            serres.dataAvail--; //shared write

        pthread_mutex_unlock(&serres.sermut);
        /*************************************************/

        //usleep(1000000);
        //tcflush(serres.serialFD, TCIOFLUSH);
        count++;

    }
    serres.statusTX = eSTOPPED;
    pthread_cond_signal(&serres.genDatSleep);
    pthread_exit(&serres.sigint);
}


/*
 *  pthread DatGen generates the data for serTx
 *  pthread SerTx consumes the data generated by DatGen and sends out the serial                                                                                          port
 */
int main(int argc, char **argv)
{
    pthread_t datGen, serRx, serTx;
    pid_t pid = getpid();
    int thrdstat = 0;


    Init(SPORT);
    signal(SIGINT, SIGHandler);
    pthread_create(&datGen, NULL, DatGen, NULL);
    pthread_create(&serTx,  NULL, SerTx,  NULL);


    //Wait for all the threads to close
    pthread_join(datGen,NULL);
    while(serres.StartCond == 1);
    pthread_join(serTx,NULL);

    Deinit();
    printf("\n>>>> End <<<<< %d\r\n", pid);
    return 0;
}


编辑: 感谢您编辑您的问题并发布您生成的数据和相应的 实际 在远程接收的数据 设备.

您的问题[和解决方案]简单得多。请参阅下面的 UPDATE #2 部分。


您正在访问您的环队列索引变量(例如 datInPoolIdxdatOutPoolIdx 锁定区域之外。

我很钦佩你为此付出的所有努力。但是,我认为你有点太复杂了。

你真的只需要在锁下访问索引变量。

松散地...

如果环形队列为空,Tx 线程应该休眠:

enqidx == deqidx

如果环队列已满,生成线程应该休眠:

((enqidx + 1) % DATPOOLSIZ) == deqidx

pthread_cond_wait/pthread_cond_signal 条件应基于使用类似上述方法对这些值的比较。


这是一些伪代码,大致基于您所拥有的。它没有任何条件变量,但我想您会了解如何在需要时添加它。

这些函数一次只能处理一个字节。但是,有一种方法可以修改它们,因此它们会产生 连续 字节的长度和数量,无论是免费的还是可用的,因此您可以使用 memcpy 移动一堆字节 into/out 批量队列。

int
queue_wrap_idx(int idx)
{

    idx += 1;
    idx %= DATPOOLSIZ;

    return idx;
}

// gen_avail -- space available in queue
// RETURNS: index of place to store (or -1=full)
int
gen_avail(void)
{

    lock();
    int idxenq = datInPoolIdx;
    int idxdeq = datOutPoolIdx;
    unlock();

    int idxfull = queue_wrap_idx(idxenq);

    if (idxfull == idxdeq)
        idxenq = -1;

    return idxenq;
}

// gen_advance -- advance generator queue index
void
gen_advance(void)
{
    lock();

    int idxenq = datInPoolIdx;
    idxenq = queue_wrap_idx(idxenq);
    datInPoolIdx = idxenq;

    unlock();
}

// tx_avail -- data available in queue
// RETURNS: index of place to dequeue (or -1=empty)
int
tx_avail(void)
{

    lock();
    int idxenq = datInPoolIdx;
    int idxdeq = datOutPoolIdx;
    unlock();

    if (idxdeq == idxenq)
        idxdeq = -1;

    return idxdeq;
}

// tx_advance -- advance transmitter queue index
void
tx_advance(void)
{
    lock();

    int idxdeq = datOutPoolIdx;
    idxdeq = queue_wrap_idx(idxdeq);
    datOutPoolIdx = idxdeq;

    unlock();
}

// gen_thread -- data generator thread
void
gen_thread(void *ptr)
{

    while (1) {
        int idxenq = gen_avail();

        if (idxenq >= 0) {
            DAT[idxenq] = rand();
            gen_advance();
        }
    }
}

// tx_thread -- serial port transmit thread
void
tx_thread(void *ptr)
{

    while (1) {
        int idxdeq = tx_avail();

        if (idxdeq >= 0) {
            char datval = DAT[idxdeq];
            tx_advance();
            write(serport,&datval,1);
        }
    }
}

更新:

I can certainly change it, no biggie, but I don't think its the problem I'm experiencing.

正如我所提到的,您肯定存在竞争条件。 Unfixed/latent 竞争条件 出现 非常像故障 H/W。在解决这些问题之前,您无法进一步推测。

If I was overwriting the transmitted memory region via the GenDat thread then I would expect to see fragments on the receiving side. What I'm seeing is a single incorrect byte that is affecting my packaging of data.

构建诊断模式[或两个] ...

只需让 GenThread 发送一个 递增 字节流(相对于 rand/其他)。在这种模式下,完全跳过 UART TX。 TxThread 很容易注意到间隙[因为它应该看到序列 0x00-0xFF 无限重复]。

这将 运行 线程以更快的速度运行,并且更有可能出现竞争条件。

删除所有[调试] printf。他们有可能扰乱时间的锁,所以你 不是 测量你的 [真实] 系统,而是 printf 的系统。 printf 调用 并且一团糟。

您可以添加随机 nanosleep 调用以进一步强调设置。要有创意。使用原力的阴暗面来创建比真实系统所经历的的测试。

您可以让 GenThread 故意 发送乱序字节以验证 TxThread 可以检测到间隙。

当您完成所有这些工作后,运行 一天左右的诊断模式,看看会发生什么。根据我的经验,您通常会在几分钟到一个小时内看到一些东西。当我进行测试时,我会 运行 作为验收测试过夜。

It looks like the uart TX hardware is glitching, is it hardware? LOL yes!!(i'm software) but its most definitely software...somewhere.

嗯……不太可能。我有直接 [commercial/product] 在 RPi 上使用 UART TX 的经验。

发送方系统和接收方系统之间的 RTS/CTS 设置可能不一致。或者,Tx/Rx 时钟频率/波特率略有偏差。

远程系统可能超过运行缓冲区(即)它跟不上[突发]数据速率。

接收器处理数据的速度可能很慢。您应该使用 hires 时间戳(例如 clock_gettime(CLOCK_MONOTONIC,...) 来标记字节到达等

我为此做的是有一个记录“事件”类型和时间戳的“跟踪缓冲区”。不幸的是 [对你 ;-)],我使用环形队列记录这些事件 [使用 stdatomic.h 函数,例如 atomic_compare_exchange_strong] 到 add/remove 跟踪条目。

因此,您需要一个 solid 多线程环形队列实现来保存跟踪条目 [这里是先有鸡还是先有蛋的问题]。如果接收器类似于 FreeRTOS,您将不得不处理 cli/sti 和其他裸机注意事项。


更新#2:

我写了一个小的 perl 脚本来分析和比较你的数据。这种分析也可以通过创建两个具有两位十六进制值的文件来完成,每行一个。这是 diff -u 输出:

--- tx.txt  2021-02-23 10:38:22.295135431 -0500
+++ rx.txt  2021-02-23 10:38:22.295135431 -0500
@@ -10,6 +10,7 @@
 AD
 EB
 C9
+0D
 0A
 B2
 1D
@@ -52,4 +53,3 @@
 1D
 A7
 8C
-FF

设备接收到的数据与生成的序列相同除了生成的数据字节是:

0A

接收方得到:

0D 0A

主机 内核 TTY 层在看到 <LF>.

时正在发送 <CR><LF>

这是因为当您设置 termios 参数时,您没有正确设置“原始”模式。

具体来说,您没有禁用“实现定义的输出处理”[根据 man termios 中的示例]。

即在Init中需要添加:

options.c_oflag &= ~OPOST;