如何中断被信号阻止的增强消息队列发送和接收?

How can I interrupt boost message queue send & receive that are being blocked by signals?

我有一个正在使用 boost 消息队列的进程。当由于已达到队列大小限制而在 sendreceive 中被阻塞时,如果我发送信号,函数调用似乎仍然处于阻塞状态。我希望调用取消或引发异常,但它并没有那样做。如何中断 sendreceive 函数调用?

#include <boost/interprocess/ipc/message_queue.hpp>
#include <signal.h>
#include <iostream>
#include <vector>

using namespace boost::interprocess;

static sig_atomic_t do_exit = 0;

void sig_handler(int sig)
{
  printf("signal %d", sig);
  do_exit = 1;
}

int main ()
{
  signal(SIGINT, sig_handler);

  try{
    //Erase previous message queue
    message_queue::remove("message_queue");

    //Create a message_queue.
    message_queue mq
        (create_only               //only create
            ,"message_queue"           //name
            ,5                       //max message number
            ,sizeof(int)               //max message size
        );

    //Send 100 numbers
    for(int i = 0; i < 100 && !do_exit; ++i){
      mq.send(&i, sizeof(i), 0);
      printf("%i\n", i);
    }
    printf("finished\n");
  }
  catch(interprocess_exception &ex){
    std::cout << ex.what() << std::endl;
    return 1;
  }
  catch(...) {
    std:: cout << "Exception" << std::endl;
  }

  return 0;
}

方法是使用定时接口:

for (int i = 0; i < 100 && !do_exit; ++i) {
    while (!do_exit) {
        if (mq.timed_send(&i, sizeof(i), 0, now() + 10ms)) {
            printf("%i\n", i);
            break;
        }
    }

    sleep_for(50ms);
}

例如:

#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <signal.h>
#include <vector>
#include <chrono>
#include <thread>

using namespace std::chrono_literals;
namespace bip = boost::interprocess;

static sig_atomic_t do_exit = 0;

void sig_handler(int sig)
{
    printf("signal %d\n", sig);
    do_exit = 1;
}

int main()
{
    auto now = std::chrono::steady_clock::now;
    using std::this_thread::sleep_for;
    signal(SIGINT, sig_handler);

    try {
        bip::message_queue::remove("message_queue");

        bip::message_queue mq(bip::create_only, // only create
                              "message_queue",  // name
                              5,                // max message number
                              sizeof(int)       // max message size
        );

        // Send 100 numbers
        for (int i = 0; i < 100 && !do_exit; ++i) {
            while (!do_exit) {
                if (mq.timed_send(&i, sizeof(i), 0, now() + 10ms)) {
                    printf("%i\n", i);
                    break;
                }
            }

            sleep_for(50ms);
        }
        printf("finished\n");
    } catch (bip::interprocess_exception const& ex) {
        std::cout << ex.what() << std::endl;
        return 1;
    } catch (...) {
        std::cout << "Exception" << std::endl;
        return 2;
    }
}

演示