如果 I/O read() 处于阻塞阶段,如何使用 Ctrl+C 退出 C++ 程序?

How to exit a C++ program with Ctrl+C, if I/O read() is in blocking stage?

我正在 ROS 环境中工作,并尝试在并行线程上读取 CANBUS。我在主线程中初始化了 canbus,因为我想确保连接了 CAN 电缆。通过初始化,我的意思是setsockopt()ioctl()bind()来配置套接字。

void readCanbus(int soktId) {
    while(true)
        int nbytes = read(soktId, ...);
}

int main() {
    int soktId;
    someSocketSetupFn(soktId);

    std::thread t(readCanbus, soktId);

    t.join();
}

问题:如果没有传入的 CAN 消息,read() 会被阻止。 Ctrl+C 不会终止 C++11 程序。

我怎样才能使 read() 终止并因此终止整个程序?

这个post提出了POSIX的解决方案。我正在使用 ubuntu16.04。

下面是一个小例子,说明如何使用自管道技巧使 I/O 线程在收到 CTRL-C 时正常退出。请注意,为简单起见,示例中的 I/O 事件循环是在 main() 线程中完成的,而不是在单独的线程中完成的,但是无论事件循环在哪个线程中,该技术都有效——信号-处理程序回调向管道的一端写入一个字节(),这导致管道另一端的线程select()-ing从管道的select到return fd 处于就绪读取状态。一旦它检测到(通过 FD_ISSET()),I/O 事件循环就知道是时候退出了。

#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/socket.h>

int _signalHandlerFD;

static void MySignalHandler(int sig)
{
   if (sig == SIGINT)
   {
      printf("Control-C/SIGINT detected!  Signaling main thread to shut down\n");
      char junk = 'x';
      if (write(_signalHandlerFD, &junk, sizeof(junk)) != sizeof(junk)) perror("send");
   }
}

/** Sets the given socket to blocking-mode (the default) or non-blocking mode
  * In order to make sure a given socket never blocks inside recv() or send(),
  * call SetSocketBlockingEnabled(fd, false)
  */
bool SetSocketBlockingEnabled(int fd, bool blocking)
{
   if (fd < 0) return false;

#ifdef _WIN32
   unsigned long mode = blocking ? 0 : 1;
   return (ioctlsocket(fd, FIONBIO, &mode) == 0) ? true : false;
#else
   int flags = fcntl(fd, F_GETFL, 0);
   if (flags == -1) return false;
   flags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);
   return (fcntl(fd, F_SETFL, flags) == 0) ? true : false;
#endif
}

int main(int, char **)
{
   // Create a pipe that our signal-handler can use
   // to signal our I/O thread
   int pipefds[2];
   if (pipe(pipefds) != 0)
   {
      perror("pipe");
      exit(10);
   }
   _signalHandlerFD = pipefds[1];  // the "write" end of the pipe

   // Install our signal handler
   if (signal(SIGINT, MySignalHandler) != 0)
   {
      perror("signal");
      exit(10);
   }

   // Now we do our I/O event loop (a real program might
   // do this in a separate thread, but it can work anywhere
   // so for simplicity I'm doing it here)
   const int timeToQuitFD = pipefds[0];
   while(1)
   {
      fd_set readFDs;
      FD_ZERO(&readFDs);
      FD_SET(timeToQuitFD, &readFDs);

      int maxFD = timeToQuitFD;

      // If you have other sockets you want to read from,
      // call FD_SET(theSocket, &readFDS) on them here, and
      // update maxFD be to the maximum socket-fd value across
      // of all of the sockets you want select() to watch

      // select() will not return until at least one socket
      // specified by readFDs is ready-to-be-read-from.
      if (select(maxFD+1, &readFDs, NULL, NULL, NULL) >= 0)
      {
         if (FD_ISSET(timeToQuitFD, &readFDs))
         {
            printf("Signal handler told the main thread it's time to quit!\n");
            break;
         }

         // also call FD_ISSET() on any other sockets here, and
         // read() from them iff it returns true
      }
      else if (errno != EINTR)
      {
         perror("select()");
         break;
      }
   }
   printf("main thread exiting, bye!\n");

   return 0;
}

如果您想模仿其他线程的中断行为,这些线程必须允许中断,并且您的信号处理线程必须将信号传递给它们。考虑以下片段:

static volatile std::atomic<bool> quit;
static volatile std::deque<std::thread> all;
static volatile pthread_t main_thread;

void sigint_handler (int) {
    if (pthread_self() == main_thread) {
        write(2, "\rQuitting.\n", 11);
        quit = true;
        for (auto &t : all) pthread_kill(t.native_handle(), SIGINT);
    } else if (!quit) pthread_kill(main_thread, SIGINT);
}

退出是通过设置全局变量来传达的。线程将唤醒并检查该变量。唤醒线程就是访问线程并向它发送信号。

如果工作线程在主线程之前拦截 SIGINT,然后它会将其发送到主线程以启动正确的关闭序列。

为了允许被中断,一个线程可以调用siginterrupt()

void readCanbus(int s) {
    siginterrupt(SIGINT, 1);
    while(!quit) {
        char buf[256];
        int nbytes = read(s, buf, sizeof(buf));
    }
    write(2, "Quit.\n", 6);
}

我们定义了两种安装信号处理程序的方法,一种使用 signal,另一种使用 sigaction,但使用提供类似于 signal.[=20 语义的标志=]

template <decltype(signal)>
void sighandler(int sig, sighandler_t handler) {
    signal(sig, handler);
}

template <decltype(sigaction)>
void sighandler(int sig, sighandler_t handler) {
    struct sigaction sa = {};
    sa.sa_handler = handler;
    sa.sa_flags = SA_RESTART;
    sigaction(sig, &sa, NULL);
}

主线程安装信号处理程序,初始化 main_thread,并使用稍后需要关闭的工作线程填充容器。

int main () {
    int sp[2];
    main_thread = pthread_self();
    socketpair(AF_UNIX, SOCK_STREAM, 0, sp);
    all.push_back(std::thread(readCanbus, sp[0]));
    sighandler<sigaction>(SIGINT, sigint_handler);
    for (auto &t : all) t.join();
}