c ++ Fast IPC - 提升消息队列似乎很慢?

c++ Fast IPC - boost message queue seems slow?

我有一个我自己似乎无法解决的问题。我有 Process1 在 while 循环中计算数据。这个过程必须尽快执行。我需要在Process1中计算的数据供以后分析,写入文件很慢。

我从未使用过 IPC,但我认为这是将 Process1 中的数据存储在内存中并从另一个时间不严格的 Process2(独立程序)访问它并将日期写入文件的好方法。

我已经创建了我的小测试程序(以了解 IPC)所以:

  1. Process1 将 运行 即使 Process2 不可访问 - 然后它将跳过 IPC 并只执行
  2. 当 运行ing Process2 时,它将等待 Process1 - 如果 Process1 启动,则获取数据,然后写入磁盘。
  3. Process2 将仅在 10 个样本以下获取 x 数量的数据 (maxRunTime)。

我目前创建的程序非常慢,当通过 IPC 发送消息时,速度慢了 6 倍。目前我每个 "TimeStep" 只传递三个浮点数,但这可能是 100。运行时间可能是 10.000。

待办事项: 如果有人能指导我正确的方向,我会很高兴。下面的代码是有效的,可能是运气不好,因为它不漂亮。

我需要找到一个尽可能快的解决方案,但不必是实时的。由于我不是专业程序员,我还需要在复杂性上妥协,因为我需要了解我在做什么。

希望有人能帮忙。

代码:

  1. 使用 Boost.1.59 和 MSVC 11。0_x86
  2. 两个独立的程序 - ConsoleApps

进程 1:

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <iostream>
#include <vector>
#include <windows.h>
#include <string>
#include <ctime>
#include <iostream>
#include <fstream>
#include <map>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <time.h>


#pragma comment(lib, "user32.lib")

using namespace std;
using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock; 


bool InitCreateMsgQ()
{
    bool initOK = false;
    //Create a msgQ for parsing data
    try
    {
        message_queue::remove("msgQData");
        //Create a message_queue.
        message_queue mqData
        (open_or_create     //create q 
        ,"msgQData"         //name
        ,1000000                //max message number
        ,sizeof(float)      //max message size
        );
        initOK = true;
    }
    catch(interprocess_exception &ex)
    {
        return false;
    }
//Create State
    try
    {
        message_queue::remove("msgState");
        //Create a message_queue.
        message_queue mqState
        (open_or_create     //create q 
        ,"msgState"     //name
        ,1                  //max message number
        ,sizeof(int)        //max message size
        );
        initOK = true;
    }
    catch(interprocess_exception &ex)
    {
        return false;
    }
    return initOK;
}
bool SetState(int state)
{
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqState
        (open_only       //only oepn q
        ,"msgState"  //name
        );

        timeout = !mqState.timed_send(&state, sizeof(int), 0, 
                                        ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(100));
    }
    catch(interprocess_exception &ex)
    {
        message_queue::remove("msgState");
        timeout = true;
    }
    return timeout;
}
bool SetData(float data)
{
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqData
        (open_only       //only oepn q
        ,"msgQData"  //name
        );

        timeout = !mqData.timed_send(&data, sizeof(float), 0, 
                                        ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
        //mqData.send(&data, sizeof(float), 0);
    }
    catch(interprocess_exception &ex)
    {
        message_queue::remove("msgQData");
        timeout = true;
    }
    return timeout;
}

int main ()
{
    time_t start,end;

    int runTime = 0; //just for testing
    int dummyState = 2;
    float x;
    int state = 0;
    if (InitCreateMsgQ()){state = 1;} //If all msQ ok set state 1
    if (SetState(state)){state = 0;}// If timeout to set state go to state 0
    //Do twice to get error if observer is not started
    if (SetState(dummyState)){state = 0;}// Set Dummy state for obersver
                                         // If timeout to set state go to state 0

    time (&start);
    //Runtime!
    while(runTime<1000)
    {
        switch (state) 
        {
            case 0:
                state = 0;//force next state 0 - should not be needed
                //Do nothing and break loop if monitor tool is not ready                
                break;
            case 1:
                state = 1;
                cout << "Try SEND DATA" << endl;
                for (int i = 0; i < 3; i++)
                {
                    x = rand() % 100;
                    if (SetData(x)){state = 0;}
                }               
                break;
            default:
                break;
        }
        runTime++;
        cout << "runTime: " << runTime <<" state: " << state << endl;
    }

    message_queue::remove("msgQData");
    message_queue::remove("msgState");
    cout << "done - state: " << state << endl;

    time (&end);
    double dif = difftime (end,start);
    printf ("Elasped time is %.2lf seconds.", dif );

    getchar();
}

进程 2:

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <iostream>
#include <vector>
#include <windows.h>
#include <string>
#include <ctime>
#include <iostream>
#include <fstream>
#include <map>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <time.h>


#pragma comment(lib, "user32.lib")

using namespace std;
using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock; 

ofstream debugOut;      // Output file for debug    (DEBUG)

int getState()
{
    int state = 0;
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqState
        (open_only       //only oepn q
        ,"msgState"  //name
        );

        unsigned int priority;
        message_queue::size_type recvd_size;

        timeout = !mqState.try_receive(&state, sizeof(state), recvd_size, priority);    
    }
    catch(interprocess_exception &ex)
    {
        timeout = true;
    }

    if(timeout){state = 0;}

    return state;
}
float getData()
{
    float Data = -123456;
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqData
        (open_only       //only oepn q
        ,"msgQData"  //name
        );

        unsigned int priority;
        message_queue::size_type recvd_size;

        //Receive the data
        //mqData.try_receive(&Data, sizeof(Data), recvd_size, priority);
        timeout = !mqData.timed_receive(&Data, sizeof(Data), recvd_size, priority,
                                        ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(10));
    }
    catch(interprocess_exception &ex)
    {
        timeout = true;
    }

    if(timeout){Data = -123456;}

    return Data;
}

int main ()
{
    int state = 0;
    int maxRunTime = 10;
    float Data;
    float DataArray[100000];

    debugOut.open("IPCWriteTest.txt", std::ios::trunc);
    debugOut.close();

    while(true)
    {
        switch (state) 
        {
            case 0: 
                //Do nothing - data not ready state
                if(getState() == 1)
                {
                    state = 1;
                    cout << "State: 1" <<endl;
                } //If all msQ ok set state 1
                else{state = 0;}
                break;
            case 1:
                for (int runTime = 0; runTime < maxRunTime; runTime++)
                {
                    cout << "runTime: " << runTime << " Data: ";
                    for (int i = 0; i < 3; i++)
                    {
                        Data = getData();
                        cout << Data << "   ";
                        DataArray[runTime]=Data;
                    }   
                    cout << endl;
                }

                debugOut.open("IPCWriteTest.txt", std::ios::app);
                for (int runTime = 0; runTime < maxRunTime; runTime++)
                {
                    debugOut << "runTime: " << runTime << " Data: ";
                    for (int i = 0; i < 3; i++)
                    {
                        debugOut << DataArray[runTime] << " ";

                    }   
                    debugOut << endl;
                }
                debugOut.close();
                state = 0;
                break;
            default:
                break;
        }
    }

    std::cout << "done" << endl;
    getchar();
}

您正在为每个操作打开队列。

您应该尝试打开一次并传递对所有相关代码的引用(通常您会将其作为成员存储在 class 中)。

此外,拥有单独的队列是导致速度缓慢的原因。在我看来你是 "abusing" mqState 作为 interprocess::condition_variable 或信号量:

无论如何,将异常转换为乏味的错误代码并不是很有效率。您正在手动执行异常处理应该执行的操作。

此外,将调试消息跟踪到标准输出这一事实将大大 减慢程序速度,尤其是 Windows

关于观察者的注释

情况相同,debugOutput 文件也不应连续重新打开。

"hardlooping" 成为三胞胎很奇怪。如果是队列,一次只弹出 1 条消息。如果消息 "logically" 由三个浮点数组成,则发送包含三个浮点数的消息。现在我什至认为这是一个错误:

            for (int i = 0; i < 3; i++) {
                data = getData();
                std::cout << data << "   ";
                DataArray[runTime] = data;
            }

它将三个不同的值分配给同一个索引(runTime)...

简化代码

我 "reviewed it"(清理)之后的生产者代码:

Live1 On Coliru

#include <boost/date_time.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <fstream>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <map>
#include <string>
#include <vector>

namespace bip = boost::interprocess;
namespace pt  = boost::posix_time;

struct QueueLogic {

    bool forced_remove = bip::message_queue::remove("msgQData");
    bip::message_queue mqData{ bip::open_or_create, "msgQData", 1000000, sizeof(float) };

    bool SetData(float data) {
        return !mqData.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
    }
};

#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>
using Clock = boost::chrono::high_resolution_clock;

int main() {
    std::vector<float> pre_calculated;
    std::generate_n(back_inserter(pre_calculated), 10000*100, [] { return rand()%100; });

    auto start = Clock::now();

    try {
        QueueLogic instance;

        for (auto v : pre_calculated)
            instance.SetData(v);

    } catch(std::exception const& e) {
        std::cout << "Exception thrown: " << e.what() << "\n";
        bip::message_queue::remove("msgQData");
        throw;
    }

    auto end = Clock::now();
    std::cout << boost::chrono::duration_cast<boost::chrono::milliseconds>(end-start) << "\n";
}

消费者代码:

Live1 On Coliru

#include <iostream>
#include <fstream>
#include <vector>

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>

using namespace std;
namespace bip = boost::interprocess;
namespace pt  = boost::posix_time;

#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>
using Clock = boost::chrono::high_resolution_clock;

struct ObserverLogic {

    bip::message_queue mqData{bip::open_only, "msgQData"};

    float getData() {
        float data;
        bip::message_queue::size_type recvd_size;
        unsigned int priority;
        if (!mqData.timed_receive(&data, sizeof(data), recvd_size, priority,
                                  pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(10))) 
        {
            throw std::runtime_error("timeout in timed_receive");
        }

        return data;
    }
};

int main() {
    std::vector<float> DataArray;
    DataArray.reserve(100000);

    ObserverLogic instance;

    try {
        while (DataArray.size() <= 100000) {
            DataArray.push_back(instance.getData());
        }
    } catch (std::exception const &e) {
        std::cout << "Exception caught: " << e.what() << "\n";
    }

    std::cout << "Received " << DataArray.size() << " messages\n";
    std::copy(DataArray.begin(), DataArray.end(), std::ostream_iterator<float>(std::cout, "; "));

    std::cout << "\n\ndone" << std::endl;
}

备注

Live1 - Coliru

不允许共享内存

请在下面找到我更新的代码:使用 MSVC14 编译。

我现在只有一个问题。如果我在生产者 运行 时关闭我的消费者,它会停止吗?不知道为什么。

制作人

#include <boost/date_time.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <fstream>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <map>
#include <string>
#include <vector>
#include <time.h>
#include <windows.h>

namespace bip = boost::interprocess;
namespace pt = boost::posix_time;

struct QueueLogic 
{
    //DataConfig Setup
    bool forced_removeDataConfig = bip::message_queue::remove("msgDataConfig");
    bip::message_queue mqDataConfig{ bip::open_or_create, "msgDataConfig", 2, sizeof(float) };

    bool SetDataConfig(float data) {
        return !mqDataConfig.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
    }

    //Data Setup
    bool forced_remove = bip::message_queue::remove("msgQData");
    bip::message_queue mqData{ bip::open_or_create, "msgQData", 1000000, sizeof(float) };

    bool SetData(float data) {
        return !mqData.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
    }
};



int main() 
{
    time_t start, end;
    time(&start);

    float noVarsToMonitor = 10.f;
    float minTimeStep = 1.f;// 0.001f;

    std::vector<float> pre_calculated;
    std::vector<float> data_config;

    //Set Vars to monitor
    data_config.push_back(noVarsToMonitor); //Add noVars as first param in vector
    data_config.push_back(minTimeStep); //Add noVars as first param in vector

    //Parse parameters into vector
    std::generate_n(back_inserter(pre_calculated), noVarsToMonitor, [] { return rand() % 100; });

    //Create instance of struct
    QueueLogic instance;

    //Setup data config
    try
    {       
        for (auto v : data_config)
        {
            instance.SetDataConfig(v);
        }
    }
    catch (std::exception const& e)
    {
            std::cout << "Exception thrown: " << e.what() << "\n";
            bip::message_queue::remove("msgQData");
            bip::message_queue::remove("msgDataConfig");
            throw;
    }

    //Get Data
    for (size_t i = 0; i < 1000; i++) //simulate that code will be called 1000 times after data is recalculated
    {
        try
        {

            for (auto v : pre_calculated)
            {
                instance.SetData(v);
            }
            std::cout << "case: " << i << std::endl;
            Sleep(20); //sleep to test code including consumer
        }
        catch (std::exception const& e)
        {
            std::cout << "Exception thrown: " << e.what() << "\n";
            bip::message_queue::remove("msgQData");
            bip::message_queue::remove("msgDataConfig");
            throw;
        }
    }

    time(&end);
    double dif = difftime(end, start);
    printf("Elasped time is %.2lf seconds.", dif);

    getchar();
}

消费者:

#include <iostream>
#include <fstream>
#include <vector>
#include <windows.h>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>

using namespace std;
namespace bip = boost::interprocess;
namespace pt = boost::posix_time;

struct ObserverLogic 
{
    //Get Config Data
    bip::message_queue mqDataConfig{ bip::open_only, "msgDataConfig" };

    float getDataConfig()
    {
        float data;
        bip::message_queue::size_type recvd_size;
        unsigned int priority;
        if (!mqDataConfig.timed_receive(&data, sizeof(data), recvd_size, priority,
            pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(250)))
        {
            throw std::runtime_error("timeout in timed_receive");
        }
        return data;
    }

    //Get Var Data
    bip::message_queue mqData{ bip::open_only, "msgQData" };

    float getData() 
    {
        float data;
        bip::message_queue::size_type recvd_size;
        unsigned int priority;
        if (!mqData.timed_receive(&data, sizeof(data), recvd_size, priority,
            pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(250)))
        {
            throw std::runtime_error("timeout in timed_receive");
        }
        return data;
    }
};

int main() {
    std::vector<float> DataArray;
    int count = 0; 
    float maxMonitorTime = 10.f;
    DataArray.reserve(100000);

    //Fetch this from Producer
    float noVarsToMonitor = 0.f; 
    float minTimeStep = 0.f;
    float maxSimSamples = 0.f;

    while (true)
    {
        try
        {
            ObserverLogic instance;

            //Get Numbers of vars to monitor - used another thread!
            noVarsToMonitor = instance.getDataConfig();
            minTimeStep = instance.getDataConfig();
            maxSimSamples = (noVarsToMonitor*(maxMonitorTime * floor((1 / minTimeStep) + 0.5)))-1;

            std::cout << "noVarsToMonitor: " << noVarsToMonitor << std::endl;
            std::cout << "minTimeStep: " << minTimeStep << std::endl;
            std::cout << "maxSimSamples: " << maxSimSamples << std::endl;

            std::ofstream ofs("IPQ_Debug.log", std::ios::trunc); //Only clear when data is ready from Producer

            //Get Var Data below here:
            try
            {
                while (DataArray.size() <= maxSimSamples)
                {
                    float value = instance.getData();
                    DataArray.push_back(value);
                    ofs << value << "; ";
                    count++;

                    if (count>noVarsToMonitor - 1) //Split Vector to match no Vars pr. Timestep
                    {
                        ofs << std::endl;
                        count = 0;
                    }
                }
                std::cout << "Received " << DataArray.size() << " messages\n";
                std::cout << "\n\ndone" << std::endl;
                std::cout << std::endl;
            }
            catch (std::exception const &e)
            {
                std::cout << "Exception caught: " << e.what() << "\n";
            }
        }
        catch (std::exception const &e)
        {
            std::cout << "Exception caught: " << e.what() << "\n";
        }
        std::cout << "Wait 5 seconds to try fetch again" << "\n";
        Sleep(5000); //sleep and wait to run loop again before looking at for msqQ
    }

    getchar();
}

输出到txt:

41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 

然后可以根据 "simulation time" 将数据绘制在右列和行中。

它可能仍然不是漂亮的代码,但我仍在学习,我很感谢我在第一次 post 时得到的支持。欢迎大家留下评论