所有被空管道读取阻塞的线程

All threads blocked by empty pipe read

我正在尝试自学 C 中的多线程和多进程编程 (Linux)。我写了一个简短的程序,它生成一个新线程,该线程进入一个例程,该例程试图从一个空的 FIFO 进行阻塞读取,而主线程继续并打印到 STDOUT。 (注意:在执行程序之前,我确实在终端中使用 mkfifo newfifo 创建了一个 FIFO)

我期待程序打印到屏幕 "Main thread",然后在等待我将数据放入 FIFO 时阻塞。相反,整个过程被阻塞,消息 "Main thread" 仅在我将数据放入 FIFO 后打印。

我是不是漏掉了什么?即使生成的线程被阻塞,主线程不应该继续 运行 吗?我尝试使用 fork 进行测试并创建子进程并得到相同的结果(两个进程都被从空 FIFO 中读取阻塞)。

代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <pthread.h>

#define NEWPIPE "./newfifo"

typedef struct __attribute__ ((__packed__)) {
  int reserved      :30;
  int threadStarted :1;
  int msgRcved      :1;
} Status;

void* thread_routine(int fd, char* buffer, Status* myStatus)
{
  int great_success = 0; 

  myStatus->threadStarted = 1;
  printf("Side thread\n");

  great_success = read(fd, buffer, sizeof(buffer));

  if (great_success < 0) {
    printf("pipe failed\n");
  } else {
    myStatus->msgRcved = 1;
  }
}

void main()
{
  int fd;
  int cnt = 0;
  char buffer[20];
  Status* myStatus;
  pthread_t thread_id;

  myStatus = (Status*) malloc(sizeof(Status));
  myStatus->reserved      = 0;
  myStatus->threadStarted  = 0;
  myStatus->msgRcved      = 0;

  fd = open(NEWPIPE, O_RDONLY);

  pthread_create(&thread_id, 
                  NULL, 
                  (void *) thread_routine(fd, buffer, myStatus), 
                  NULL);

  printf("Main thread\n");

  while (!myStatus->threadStarted) {
    printf("Main thread: side thread started!\n");
  }

  while (!myStatus->msgRcved) {
    sleep(1);
    cnt++;
  } 

  printf("buffer (cnt = %d): %s\n", cnt, buffer);

}

编辑:最新代码

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <pthread.h>

#define NEWPIPE "./newfifo"

struct Status {
  unsigned int reserved      :30;
  unsigned int threadStarted :1;
  unsigned int msgRcved      :1;
};

void* thread_routine(void *arg)
{
  int great_success = 0; 
  int fd;
  char buffer[20];
  struct Status* myStatus;

  fd = open(NEWPIPE, O_RDONLY);

  myStatus = arg;

  myStatus->threadStarted = 1;
  printf("Side thread\n");

  while (1) {
    great_success = read(fd, buffer, 20);

    if (great_success < 0) {
      printf("pipe failed\n");
    } else {
      printf("buffer : %s\n", buffer);
      printf("great_success = %d\n", great_success);
      great_success = 0;
    }
  }
}

void main()
{
  int cnt = 0;
  struct Status* myStatus;
  pthread_t thread_id;

  myStatus = (struct Status*) malloc(sizeof(struct Status));
  myStatus->reserved      = 0;
  myStatus->threadStarted  = 0;
  myStatus->msgRcved      = 0;

  pthread_create(&thread_id, 
                  NULL, 
                  &thread_routine,
                  (void *) myStatus);    // arguments to pass to the function!


  printf("Main thread\n");

  while (!myStatus->msgRcved) {
    printf("Main thread: side thread started!\n");

    if (myStatus->threadStarted) {
      printf("spawned thread started!\n");
    }

    sleep(1);
  }

  pthread_exit(NULL);

}

您正在将 调用 thread_routine() 的结果传递给 pthread_create()。必须在执行调用之前评估所有参数,因此直到该函数 returns 才会创建线程。大概。因为 thread_routine() 不是 return 和 (*)(void *),而是 pthread_create() 试图调用 return 值,就好像它是一个值一样,所以整个程序的行为是未定义的。你想传递一个指向函数的指针,而不是调用它的结果:

pthread_create(&thread_id, 
              NULL, 
              thread_routine, 
              NULL);

"But what about the arguments,"你问?这引出了下一点:函数 thread_routine() 没有线程启动例程的正确签名。线程启动例程必须接受 void * 类型的单个参数。 pthread_create() 的最后一个参数将作为其(单个)参数传递给指定的函数,您可以将其作为指向适当 struct 的指针来代替传递多个单独的参数。

最后,您假定的线程启动函数需要通过 returning 指针值(可能 NULL)或调用 pthread_exit() 退出。当 main() 以外的值 returning 函数到达其终端 } 而未执行 return 语句时,行为未定义。 (pthread_exit() 解决了这个问题,因为它没有 return。)

请注意,顺便说一下,您的编译器应该针对这段代码给出几个警告。您应该始终解决所有编译器警告,或者确定为什么不这样做是安全的。

Instead, the entire process is blocked, and the message "Main thread" only prints after I've put data into the FIFO.

Am I missing something here?

你的主线程被阻塞在这一行:

fd = open(NEWPIPE, O_RDONLY);

因为 FIFO 的非阻塞、只读打开将阻塞,直到写入器可用。你的主线程终于畅通了,不是你往FIFO写数据的时候,而是你干脆打开FIFO写的时候。

@JohnBollinger 代码中还有其他问题。但是,FIFO 语义是您没有看到预期的初始输出的原因。

此代码:

typedef struct __attribute__ ((__packed__)) {
    int reserved      :30;
    int threadStarted :1;
    int msgRcved      :1;
} Status;

会出现问题,因为代码使用的是带符号的值,并且结构定义不应像 typedef 一样进行 typedef:

  • 模糊代码,
  • 在 reader
  • 中产生混乱
  • 并使编译器名称混乱 space

这里是定义结构的首选方法示例 (并更正位字段的问题)

struct status  
{
    unsigned int reserved      :30;
    unsigned int threadStarted :1;
    unsigned int msgRcved      :1;
};

应该不需要 packed 属性,因为所有位都将适合单个 unsigned int 内存区域。通过以下方式引用此结构:struct status 在变量定义和函数参数列表中。

the following is a way to open a named pipe, 
so there is no need for any (before running application)
processing needed.

enum enumReturnStatus create_Pipe( INT32 taskSelector  )
{
    enum    enumReturnStatus returnStatus = eRS_Success; // indicate success
    char  *pTask_NameString               = NULL;
    char    Pipe_NameString[ 100 ]        = {'[=10=]'};
    struct  stat statInfo; // will contain info on a file 
                          // and is used to determine if the pipe already exists

    INT32 mkfifoStatus                    = 0;
    INT32 statStatus                      = 0;



    if( 0 >= Pipe_Parameters[ taskSelector ].Pipe_FD )
    { // then pipe not open

        pTask_NameString = get_pTask_NameString( taskSelector );

        if( NULL == pTask_NameString )
        { // task not configured

            return( eRS_Failure );
        }


        /* *********************************************************************
         *  implied else, task configured
         * ********************************************************************
         */


        // create pipe name string
        sprintf( Pipe_NameString, "/tmp/Pipe_2%s", pTask_NameString );



        // check if pipe already exists
        statStatus = stat(Pipe_NameString, &statInfo);

        if( (statStatus)&&(ENOENT == errno) )
        { // then, FIFO pipe does not exist

            // create the pipe
            umask(0);
            // maybe use mknode() instead of mkfifo()
            // mknod(pPipe_name, S_IFIFO | S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP, 0 );

            // 0666 allows anyone to read/write the pipe
            mkfifoStatus = mkfifo( Pipe_NameString, 0666 );

            if ( -1 == mkfifoStatus )
            {
                CommonWriteCdsLog( eLL_Critical,
                    get_pFormatString(  eFormat_CFFL_string_string ),
                    __FILE__,  __LINE__,
                    "LibFunc:mkfifo() failed to create pipe ",
                    strerror( errno ) );

fprintf(stderr,"mkfifo failed: %s \n",strerror( errno ));
fflush(stderr);

                system( "sync; sync;" );
                exit( EXIT_FAILURE );
            }
        } // endif ( pipe doesn't exist )


        if( !mkfifoStatus && !statStatus )
        { // then pipe created or already exists

            if( 0 >= Pipe_Parameters[taskSelector].Pipe_FD )
            { // then, pipe not yet open

                // note: use O_RDWR so open will not hang
                Pipe_Parameters[taskSelector].Pipe_FD = open( Pipe_NameString, O_CREAT|O_RDWR );

                if( 0 >= Pipe_Parameters[taskSelector].Pipe_FD )
                { // then open failed

                    CommonWriteCdsLog( eLL_Critical, 
                        get_pFormatString(  eFormat_CFFL_string_string ), 
                        __FILE__,  __LINE__, 
                        "LibFunc:open() failed for pipe", 
                        strerror( errno ) );
                }

                else
                { // else open successful
                    ;
                } // endif( open for read successful )
            } // endif( pipe not already open )
        } // endif( pipe created or already exists )
    }  // endif( pipe not open )

    return( returnStatus );
} // end create_Pipe()