c ++ Fast IPC - 提升消息队列似乎很慢?
c++ Fast IPC - boost message queue seems slow?
我有一个我自己似乎无法解决的问题。我有 Process1 在 while 循环中计算数据。这个过程必须尽快执行。我需要在Process1中计算的数据供以后分析,写入文件很慢。
我从未使用过 IPC,但我认为这是将 Process1 中的数据存储在内存中并从另一个时间不严格的 Process2(独立程序)访问它并将日期写入文件的好方法。
我已经创建了我的小测试程序(以了解 IPC)所以:
- Process1 将 运行 即使 Process2 不可访问 - 然后它将跳过 IPC 并只执行
- 当 运行ing Process2 时,它将等待 Process1 - 如果 Process1 启动,则获取数据,然后写入磁盘。
- Process2 将仅在 10 个样本以下获取 x 数量的数据 (maxRunTime)。
我目前创建的程序非常慢,当通过 IPC 发送消息时,速度慢了 6 倍。目前我每个 "TimeStep" 只传递三个浮点数,但这可能是 100。运行时间可能是 10.000。
待办事项:
如果有人能指导我正确的方向,我会很高兴。下面的代码是有效的,可能是运气不好,因为它不漂亮。
我需要找到一个尽可能快的解决方案,但不必是实时的。由于我不是专业程序员,我还需要在复杂性上妥协,因为我需要了解我在做什么。
希望有人能帮忙。
代码:
- 使用 Boost.1.59 和 MSVC 11。0_x86
- 两个独立的程序 - ConsoleApps
进程 1:
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <iostream>
#include <vector>
#include <windows.h>
#include <string>
#include <ctime>
#include <iostream>
#include <fstream>
#include <map>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <time.h>
#pragma comment(lib, "user32.lib")
using namespace std;
using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock;
bool InitCreateMsgQ()
{
bool initOK = false;
//Create a msgQ for parsing data
try
{
message_queue::remove("msgQData");
//Create a message_queue.
message_queue mqData
(open_or_create //create q
,"msgQData" //name
,1000000 //max message number
,sizeof(float) //max message size
);
initOK = true;
}
catch(interprocess_exception &ex)
{
return false;
}
//Create State
try
{
message_queue::remove("msgState");
//Create a message_queue.
message_queue mqState
(open_or_create //create q
,"msgState" //name
,1 //max message number
,sizeof(int) //max message size
);
initOK = true;
}
catch(interprocess_exception &ex)
{
return false;
}
return initOK;
}
bool SetState(int state)
{
bool timeout = true;
try
{
//Open a message queue.
message_queue mqState
(open_only //only oepn q
,"msgState" //name
);
timeout = !mqState.timed_send(&state, sizeof(int), 0,
ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(100));
}
catch(interprocess_exception &ex)
{
message_queue::remove("msgState");
timeout = true;
}
return timeout;
}
bool SetData(float data)
{
bool timeout = true;
try
{
//Open a message queue.
message_queue mqData
(open_only //only oepn q
,"msgQData" //name
);
timeout = !mqData.timed_send(&data, sizeof(float), 0,
ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
//mqData.send(&data, sizeof(float), 0);
}
catch(interprocess_exception &ex)
{
message_queue::remove("msgQData");
timeout = true;
}
return timeout;
}
int main ()
{
time_t start,end;
int runTime = 0; //just for testing
int dummyState = 2;
float x;
int state = 0;
if (InitCreateMsgQ()){state = 1;} //If all msQ ok set state 1
if (SetState(state)){state = 0;}// If timeout to set state go to state 0
//Do twice to get error if observer is not started
if (SetState(dummyState)){state = 0;}// Set Dummy state for obersver
// If timeout to set state go to state 0
time (&start);
//Runtime!
while(runTime<1000)
{
switch (state)
{
case 0:
state = 0;//force next state 0 - should not be needed
//Do nothing and break loop if monitor tool is not ready
break;
case 1:
state = 1;
cout << "Try SEND DATA" << endl;
for (int i = 0; i < 3; i++)
{
x = rand() % 100;
if (SetData(x)){state = 0;}
}
break;
default:
break;
}
runTime++;
cout << "runTime: " << runTime <<" state: " << state << endl;
}
message_queue::remove("msgQData");
message_queue::remove("msgState");
cout << "done - state: " << state << endl;
time (&end);
double dif = difftime (end,start);
printf ("Elasped time is %.2lf seconds.", dif );
getchar();
}
进程 2:
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <iostream>
#include <vector>
#include <windows.h>
#include <string>
#include <ctime>
#include <iostream>
#include <fstream>
#include <map>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <time.h>
#pragma comment(lib, "user32.lib")
using namespace std;
using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock;
ofstream debugOut; // Output file for debug (DEBUG)
int getState()
{
int state = 0;
bool timeout = true;
try
{
//Open a message queue.
message_queue mqState
(open_only //only oepn q
,"msgState" //name
);
unsigned int priority;
message_queue::size_type recvd_size;
timeout = !mqState.try_receive(&state, sizeof(state), recvd_size, priority);
}
catch(interprocess_exception &ex)
{
timeout = true;
}
if(timeout){state = 0;}
return state;
}
float getData()
{
float Data = -123456;
bool timeout = true;
try
{
//Open a message queue.
message_queue mqData
(open_only //only oepn q
,"msgQData" //name
);
unsigned int priority;
message_queue::size_type recvd_size;
//Receive the data
//mqData.try_receive(&Data, sizeof(Data), recvd_size, priority);
timeout = !mqData.timed_receive(&Data, sizeof(Data), recvd_size, priority,
ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(10));
}
catch(interprocess_exception &ex)
{
timeout = true;
}
if(timeout){Data = -123456;}
return Data;
}
int main ()
{
int state = 0;
int maxRunTime = 10;
float Data;
float DataArray[100000];
debugOut.open("IPCWriteTest.txt", std::ios::trunc);
debugOut.close();
while(true)
{
switch (state)
{
case 0:
//Do nothing - data not ready state
if(getState() == 1)
{
state = 1;
cout << "State: 1" <<endl;
} //If all msQ ok set state 1
else{state = 0;}
break;
case 1:
for (int runTime = 0; runTime < maxRunTime; runTime++)
{
cout << "runTime: " << runTime << " Data: ";
for (int i = 0; i < 3; i++)
{
Data = getData();
cout << Data << " ";
DataArray[runTime]=Data;
}
cout << endl;
}
debugOut.open("IPCWriteTest.txt", std::ios::app);
for (int runTime = 0; runTime < maxRunTime; runTime++)
{
debugOut << "runTime: " << runTime << " Data: ";
for (int i = 0; i < 3; i++)
{
debugOut << DataArray[runTime] << " ";
}
debugOut << endl;
}
debugOut.close();
state = 0;
break;
default:
break;
}
}
std::cout << "done" << endl;
getchar();
}
您正在为每个操作打开队列。
您应该尝试打开一次并传递对所有相关代码的引用(通常您会将其作为成员存储在 class 中)。
此外,拥有单独的队列是导致速度缓慢的原因。在我看来你是 "abusing" mqState
作为 interprocess::condition_variable
或信号量:
- http://www.boost.org/doc/libs/1_59_0/doc/html/interprocess/synchronization_mechanisms.html#interprocess.synchronization_mechanisms.conditions
- http://www.boost.org/doc/libs/1_59_0/doc/html/interprocess/synchronization_mechanisms.html#interprocess.synchronization_mechanisms.semaphores
无论如何,将异常转换为乏味的错误代码并不是很有效率。您正在手动执行异常处理应该执行的操作。
此外,将调试消息跟踪到标准输出这一事实将大大 减慢程序速度,尤其是 Windows
关于观察者的注释
情况相同,debugOutput
文件也不应连续重新打开。
"hardlooping" 成为三胞胎很奇怪。如果是队列,一次只弹出 1 条消息。如果消息 "logically" 由三个浮点数组成,则发送包含三个浮点数的消息。现在我什至认为这是一个错误:
for (int i = 0; i < 3; i++) {
data = getData();
std::cout << data << " ";
DataArray[runTime] = data;
}
它将三个不同的值分配给同一个索引(runTime
)...
简化代码
我 "reviewed it"(清理)之后的生产者代码:
#include <boost/date_time.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <fstream>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <map>
#include <string>
#include <vector>
namespace bip = boost::interprocess;
namespace pt = boost::posix_time;
struct QueueLogic {
bool forced_remove = bip::message_queue::remove("msgQData");
bip::message_queue mqData{ bip::open_or_create, "msgQData", 1000000, sizeof(float) };
bool SetData(float data) {
return !mqData.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
}
};
#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>
using Clock = boost::chrono::high_resolution_clock;
int main() {
std::vector<float> pre_calculated;
std::generate_n(back_inserter(pre_calculated), 10000*100, [] { return rand()%100; });
auto start = Clock::now();
try {
QueueLogic instance;
for (auto v : pre_calculated)
instance.SetData(v);
} catch(std::exception const& e) {
std::cout << "Exception thrown: " << e.what() << "\n";
bip::message_queue::remove("msgQData");
throw;
}
auto end = Clock::now();
std::cout << boost::chrono::duration_cast<boost::chrono::milliseconds>(end-start) << "\n";
}
消费者代码:
#include <iostream>
#include <fstream>
#include <vector>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
using namespace std;
namespace bip = boost::interprocess;
namespace pt = boost::posix_time;
#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>
using Clock = boost::chrono::high_resolution_clock;
struct ObserverLogic {
bip::message_queue mqData{bip::open_only, "msgQData"};
float getData() {
float data;
bip::message_queue::size_type recvd_size;
unsigned int priority;
if (!mqData.timed_receive(&data, sizeof(data), recvd_size, priority,
pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(10)))
{
throw std::runtime_error("timeout in timed_receive");
}
return data;
}
};
int main() {
std::vector<float> DataArray;
DataArray.reserve(100000);
ObserverLogic instance;
try {
while (DataArray.size() <= 100000) {
DataArray.push_back(instance.getData());
}
} catch (std::exception const &e) {
std::cout << "Exception caught: " << e.what() << "\n";
}
std::cout << "Received " << DataArray.size() << " messages\n";
std::copy(DataArray.begin(), DataArray.end(), std::ostream_iterator<float>(std::cout, "; "));
std::cout << "\n\ndone" << std::endl;
}
备注
Live1 - Coliru
不允许共享内存
请在下面找到我更新的代码:使用 MSVC14 编译。
我现在只有一个问题。如果我在生产者 运行 时关闭我的消费者,它会停止吗?不知道为什么。
制作人
#include <boost/date_time.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <fstream>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <map>
#include <string>
#include <vector>
#include <time.h>
#include <windows.h>
namespace bip = boost::interprocess;
namespace pt = boost::posix_time;
struct QueueLogic
{
//DataConfig Setup
bool forced_removeDataConfig = bip::message_queue::remove("msgDataConfig");
bip::message_queue mqDataConfig{ bip::open_or_create, "msgDataConfig", 2, sizeof(float) };
bool SetDataConfig(float data) {
return !mqDataConfig.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
}
//Data Setup
bool forced_remove = bip::message_queue::remove("msgQData");
bip::message_queue mqData{ bip::open_or_create, "msgQData", 1000000, sizeof(float) };
bool SetData(float data) {
return !mqData.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
}
};
int main()
{
time_t start, end;
time(&start);
float noVarsToMonitor = 10.f;
float minTimeStep = 1.f;// 0.001f;
std::vector<float> pre_calculated;
std::vector<float> data_config;
//Set Vars to monitor
data_config.push_back(noVarsToMonitor); //Add noVars as first param in vector
data_config.push_back(minTimeStep); //Add noVars as first param in vector
//Parse parameters into vector
std::generate_n(back_inserter(pre_calculated), noVarsToMonitor, [] { return rand() % 100; });
//Create instance of struct
QueueLogic instance;
//Setup data config
try
{
for (auto v : data_config)
{
instance.SetDataConfig(v);
}
}
catch (std::exception const& e)
{
std::cout << "Exception thrown: " << e.what() << "\n";
bip::message_queue::remove("msgQData");
bip::message_queue::remove("msgDataConfig");
throw;
}
//Get Data
for (size_t i = 0; i < 1000; i++) //simulate that code will be called 1000 times after data is recalculated
{
try
{
for (auto v : pre_calculated)
{
instance.SetData(v);
}
std::cout << "case: " << i << std::endl;
Sleep(20); //sleep to test code including consumer
}
catch (std::exception const& e)
{
std::cout << "Exception thrown: " << e.what() << "\n";
bip::message_queue::remove("msgQData");
bip::message_queue::remove("msgDataConfig");
throw;
}
}
time(&end);
double dif = difftime(end, start);
printf("Elasped time is %.2lf seconds.", dif);
getchar();
}
消费者:
#include <iostream>
#include <fstream>
#include <vector>
#include <windows.h>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
using namespace std;
namespace bip = boost::interprocess;
namespace pt = boost::posix_time;
struct ObserverLogic
{
//Get Config Data
bip::message_queue mqDataConfig{ bip::open_only, "msgDataConfig" };
float getDataConfig()
{
float data;
bip::message_queue::size_type recvd_size;
unsigned int priority;
if (!mqDataConfig.timed_receive(&data, sizeof(data), recvd_size, priority,
pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(250)))
{
throw std::runtime_error("timeout in timed_receive");
}
return data;
}
//Get Var Data
bip::message_queue mqData{ bip::open_only, "msgQData" };
float getData()
{
float data;
bip::message_queue::size_type recvd_size;
unsigned int priority;
if (!mqData.timed_receive(&data, sizeof(data), recvd_size, priority,
pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(250)))
{
throw std::runtime_error("timeout in timed_receive");
}
return data;
}
};
int main() {
std::vector<float> DataArray;
int count = 0;
float maxMonitorTime = 10.f;
DataArray.reserve(100000);
//Fetch this from Producer
float noVarsToMonitor = 0.f;
float minTimeStep = 0.f;
float maxSimSamples = 0.f;
while (true)
{
try
{
ObserverLogic instance;
//Get Numbers of vars to monitor - used another thread!
noVarsToMonitor = instance.getDataConfig();
minTimeStep = instance.getDataConfig();
maxSimSamples = (noVarsToMonitor*(maxMonitorTime * floor((1 / minTimeStep) + 0.5)))-1;
std::cout << "noVarsToMonitor: " << noVarsToMonitor << std::endl;
std::cout << "minTimeStep: " << minTimeStep << std::endl;
std::cout << "maxSimSamples: " << maxSimSamples << std::endl;
std::ofstream ofs("IPQ_Debug.log", std::ios::trunc); //Only clear when data is ready from Producer
//Get Var Data below here:
try
{
while (DataArray.size() <= maxSimSamples)
{
float value = instance.getData();
DataArray.push_back(value);
ofs << value << "; ";
count++;
if (count>noVarsToMonitor - 1) //Split Vector to match no Vars pr. Timestep
{
ofs << std::endl;
count = 0;
}
}
std::cout << "Received " << DataArray.size() << " messages\n";
std::cout << "\n\ndone" << std::endl;
std::cout << std::endl;
}
catch (std::exception const &e)
{
std::cout << "Exception caught: " << e.what() << "\n";
}
}
catch (std::exception const &e)
{
std::cout << "Exception caught: " << e.what() << "\n";
}
std::cout << "Wait 5 seconds to try fetch again" << "\n";
Sleep(5000); //sleep and wait to run loop again before looking at for msqQ
}
getchar();
}
输出到txt:
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
然后可以根据 "simulation time" 将数据绘制在右列和行中。
它可能仍然不是漂亮的代码,但我仍在学习,我很感谢我在第一次 post 时得到的支持。欢迎大家留下评论
我有一个我自己似乎无法解决的问题。我有 Process1 在 while 循环中计算数据。这个过程必须尽快执行。我需要在Process1中计算的数据供以后分析,写入文件很慢。
我从未使用过 IPC,但我认为这是将 Process1 中的数据存储在内存中并从另一个时间不严格的 Process2(独立程序)访问它并将日期写入文件的好方法。
我已经创建了我的小测试程序(以了解 IPC)所以:
- Process1 将 运行 即使 Process2 不可访问 - 然后它将跳过 IPC 并只执行
- 当 运行ing Process2 时,它将等待 Process1 - 如果 Process1 启动,则获取数据,然后写入磁盘。
- Process2 将仅在 10 个样本以下获取 x 数量的数据 (maxRunTime)。
我目前创建的程序非常慢,当通过 IPC 发送消息时,速度慢了 6 倍。目前我每个 "TimeStep" 只传递三个浮点数,但这可能是 100。运行时间可能是 10.000。
待办事项: 如果有人能指导我正确的方向,我会很高兴。下面的代码是有效的,可能是运气不好,因为它不漂亮。
我需要找到一个尽可能快的解决方案,但不必是实时的。由于我不是专业程序员,我还需要在复杂性上妥协,因为我需要了解我在做什么。
希望有人能帮忙。
代码:
- 使用 Boost.1.59 和 MSVC 11。0_x86
- 两个独立的程序 - ConsoleApps
进程 1:
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <iostream>
#include <vector>
#include <windows.h>
#include <string>
#include <ctime>
#include <iostream>
#include <fstream>
#include <map>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <time.h>
#pragma comment(lib, "user32.lib")
using namespace std;
using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock;
bool InitCreateMsgQ()
{
bool initOK = false;
//Create a msgQ for parsing data
try
{
message_queue::remove("msgQData");
//Create a message_queue.
message_queue mqData
(open_or_create //create q
,"msgQData" //name
,1000000 //max message number
,sizeof(float) //max message size
);
initOK = true;
}
catch(interprocess_exception &ex)
{
return false;
}
//Create State
try
{
message_queue::remove("msgState");
//Create a message_queue.
message_queue mqState
(open_or_create //create q
,"msgState" //name
,1 //max message number
,sizeof(int) //max message size
);
initOK = true;
}
catch(interprocess_exception &ex)
{
return false;
}
return initOK;
}
bool SetState(int state)
{
bool timeout = true;
try
{
//Open a message queue.
message_queue mqState
(open_only //only oepn q
,"msgState" //name
);
timeout = !mqState.timed_send(&state, sizeof(int), 0,
ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(100));
}
catch(interprocess_exception &ex)
{
message_queue::remove("msgState");
timeout = true;
}
return timeout;
}
bool SetData(float data)
{
bool timeout = true;
try
{
//Open a message queue.
message_queue mqData
(open_only //only oepn q
,"msgQData" //name
);
timeout = !mqData.timed_send(&data, sizeof(float), 0,
ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
//mqData.send(&data, sizeof(float), 0);
}
catch(interprocess_exception &ex)
{
message_queue::remove("msgQData");
timeout = true;
}
return timeout;
}
int main ()
{
time_t start,end;
int runTime = 0; //just for testing
int dummyState = 2;
float x;
int state = 0;
if (InitCreateMsgQ()){state = 1;} //If all msQ ok set state 1
if (SetState(state)){state = 0;}// If timeout to set state go to state 0
//Do twice to get error if observer is not started
if (SetState(dummyState)){state = 0;}// Set Dummy state for obersver
// If timeout to set state go to state 0
time (&start);
//Runtime!
while(runTime<1000)
{
switch (state)
{
case 0:
state = 0;//force next state 0 - should not be needed
//Do nothing and break loop if monitor tool is not ready
break;
case 1:
state = 1;
cout << "Try SEND DATA" << endl;
for (int i = 0; i < 3; i++)
{
x = rand() % 100;
if (SetData(x)){state = 0;}
}
break;
default:
break;
}
runTime++;
cout << "runTime: " << runTime <<" state: " << state << endl;
}
message_queue::remove("msgQData");
message_queue::remove("msgState");
cout << "done - state: " << state << endl;
time (&end);
double dif = difftime (end,start);
printf ("Elasped time is %.2lf seconds.", dif );
getchar();
}
进程 2:
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <iostream>
#include <vector>
#include <windows.h>
#include <string>
#include <ctime>
#include <iostream>
#include <fstream>
#include <map>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <time.h>
#pragma comment(lib, "user32.lib")
using namespace std;
using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock;
ofstream debugOut; // Output file for debug (DEBUG)
int getState()
{
int state = 0;
bool timeout = true;
try
{
//Open a message queue.
message_queue mqState
(open_only //only oepn q
,"msgState" //name
);
unsigned int priority;
message_queue::size_type recvd_size;
timeout = !mqState.try_receive(&state, sizeof(state), recvd_size, priority);
}
catch(interprocess_exception &ex)
{
timeout = true;
}
if(timeout){state = 0;}
return state;
}
float getData()
{
float Data = -123456;
bool timeout = true;
try
{
//Open a message queue.
message_queue mqData
(open_only //only oepn q
,"msgQData" //name
);
unsigned int priority;
message_queue::size_type recvd_size;
//Receive the data
//mqData.try_receive(&Data, sizeof(Data), recvd_size, priority);
timeout = !mqData.timed_receive(&Data, sizeof(Data), recvd_size, priority,
ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(10));
}
catch(interprocess_exception &ex)
{
timeout = true;
}
if(timeout){Data = -123456;}
return Data;
}
int main ()
{
int state = 0;
int maxRunTime = 10;
float Data;
float DataArray[100000];
debugOut.open("IPCWriteTest.txt", std::ios::trunc);
debugOut.close();
while(true)
{
switch (state)
{
case 0:
//Do nothing - data not ready state
if(getState() == 1)
{
state = 1;
cout << "State: 1" <<endl;
} //If all msQ ok set state 1
else{state = 0;}
break;
case 1:
for (int runTime = 0; runTime < maxRunTime; runTime++)
{
cout << "runTime: " << runTime << " Data: ";
for (int i = 0; i < 3; i++)
{
Data = getData();
cout << Data << " ";
DataArray[runTime]=Data;
}
cout << endl;
}
debugOut.open("IPCWriteTest.txt", std::ios::app);
for (int runTime = 0; runTime < maxRunTime; runTime++)
{
debugOut << "runTime: " << runTime << " Data: ";
for (int i = 0; i < 3; i++)
{
debugOut << DataArray[runTime] << " ";
}
debugOut << endl;
}
debugOut.close();
state = 0;
break;
default:
break;
}
}
std::cout << "done" << endl;
getchar();
}
您正在为每个操作打开队列。
您应该尝试打开一次并传递对所有相关代码的引用(通常您会将其作为成员存储在 class 中)。
此外,拥有单独的队列是导致速度缓慢的原因。在我看来你是 "abusing" mqState
作为 interprocess::condition_variable
或信号量:
- http://www.boost.org/doc/libs/1_59_0/doc/html/interprocess/synchronization_mechanisms.html#interprocess.synchronization_mechanisms.conditions
- http://www.boost.org/doc/libs/1_59_0/doc/html/interprocess/synchronization_mechanisms.html#interprocess.synchronization_mechanisms.semaphores
无论如何,将异常转换为乏味的错误代码并不是很有效率。您正在手动执行异常处理应该执行的操作。
此外,将调试消息跟踪到标准输出这一事实将大大 减慢程序速度,尤其是 Windows
关于观察者的注释
情况相同,debugOutput
文件也不应连续重新打开。
"hardlooping" 成为三胞胎很奇怪。如果是队列,一次只弹出 1 条消息。如果消息 "logically" 由三个浮点数组成,则发送包含三个浮点数的消息。现在我什至认为这是一个错误:
for (int i = 0; i < 3; i++) {
data = getData();
std::cout << data << " ";
DataArray[runTime] = data;
}
它将三个不同的值分配给同一个索引(runTime
)...
简化代码
我 "reviewed it"(清理)之后的生产者代码:
#include <boost/date_time.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <fstream>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <map>
#include <string>
#include <vector>
namespace bip = boost::interprocess;
namespace pt = boost::posix_time;
struct QueueLogic {
bool forced_remove = bip::message_queue::remove("msgQData");
bip::message_queue mqData{ bip::open_or_create, "msgQData", 1000000, sizeof(float) };
bool SetData(float data) {
return !mqData.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
}
};
#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>
using Clock = boost::chrono::high_resolution_clock;
int main() {
std::vector<float> pre_calculated;
std::generate_n(back_inserter(pre_calculated), 10000*100, [] { return rand()%100; });
auto start = Clock::now();
try {
QueueLogic instance;
for (auto v : pre_calculated)
instance.SetData(v);
} catch(std::exception const& e) {
std::cout << "Exception thrown: " << e.what() << "\n";
bip::message_queue::remove("msgQData");
throw;
}
auto end = Clock::now();
std::cout << boost::chrono::duration_cast<boost::chrono::milliseconds>(end-start) << "\n";
}
消费者代码:
#include <iostream>
#include <fstream>
#include <vector>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
using namespace std;
namespace bip = boost::interprocess;
namespace pt = boost::posix_time;
#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>
using Clock = boost::chrono::high_resolution_clock;
struct ObserverLogic {
bip::message_queue mqData{bip::open_only, "msgQData"};
float getData() {
float data;
bip::message_queue::size_type recvd_size;
unsigned int priority;
if (!mqData.timed_receive(&data, sizeof(data), recvd_size, priority,
pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(10)))
{
throw std::runtime_error("timeout in timed_receive");
}
return data;
}
};
int main() {
std::vector<float> DataArray;
DataArray.reserve(100000);
ObserverLogic instance;
try {
while (DataArray.size() <= 100000) {
DataArray.push_back(instance.getData());
}
} catch (std::exception const &e) {
std::cout << "Exception caught: " << e.what() << "\n";
}
std::cout << "Received " << DataArray.size() << " messages\n";
std::copy(DataArray.begin(), DataArray.end(), std::ostream_iterator<float>(std::cout, "; "));
std::cout << "\n\ndone" << std::endl;
}
备注
Live1 - Coliru
请在下面找到我更新的代码:使用 MSVC14 编译。
我现在只有一个问题。如果我在生产者 运行 时关闭我的消费者,它会停止吗?不知道为什么。
制作人
#include <boost/date_time.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <fstream>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <map>
#include <string>
#include <vector>
#include <time.h>
#include <windows.h>
namespace bip = boost::interprocess;
namespace pt = boost::posix_time;
struct QueueLogic
{
//DataConfig Setup
bool forced_removeDataConfig = bip::message_queue::remove("msgDataConfig");
bip::message_queue mqDataConfig{ bip::open_or_create, "msgDataConfig", 2, sizeof(float) };
bool SetDataConfig(float data) {
return !mqDataConfig.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
}
//Data Setup
bool forced_remove = bip::message_queue::remove("msgQData");
bip::message_queue mqData{ bip::open_or_create, "msgQData", 1000000, sizeof(float) };
bool SetData(float data) {
return !mqData.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
}
};
int main()
{
time_t start, end;
time(&start);
float noVarsToMonitor = 10.f;
float minTimeStep = 1.f;// 0.001f;
std::vector<float> pre_calculated;
std::vector<float> data_config;
//Set Vars to monitor
data_config.push_back(noVarsToMonitor); //Add noVars as first param in vector
data_config.push_back(minTimeStep); //Add noVars as first param in vector
//Parse parameters into vector
std::generate_n(back_inserter(pre_calculated), noVarsToMonitor, [] { return rand() % 100; });
//Create instance of struct
QueueLogic instance;
//Setup data config
try
{
for (auto v : data_config)
{
instance.SetDataConfig(v);
}
}
catch (std::exception const& e)
{
std::cout << "Exception thrown: " << e.what() << "\n";
bip::message_queue::remove("msgQData");
bip::message_queue::remove("msgDataConfig");
throw;
}
//Get Data
for (size_t i = 0; i < 1000; i++) //simulate that code will be called 1000 times after data is recalculated
{
try
{
for (auto v : pre_calculated)
{
instance.SetData(v);
}
std::cout << "case: " << i << std::endl;
Sleep(20); //sleep to test code including consumer
}
catch (std::exception const& e)
{
std::cout << "Exception thrown: " << e.what() << "\n";
bip::message_queue::remove("msgQData");
bip::message_queue::remove("msgDataConfig");
throw;
}
}
time(&end);
double dif = difftime(end, start);
printf("Elasped time is %.2lf seconds.", dif);
getchar();
}
消费者:
#include <iostream>
#include <fstream>
#include <vector>
#include <windows.h>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
using namespace std;
namespace bip = boost::interprocess;
namespace pt = boost::posix_time;
struct ObserverLogic
{
//Get Config Data
bip::message_queue mqDataConfig{ bip::open_only, "msgDataConfig" };
float getDataConfig()
{
float data;
bip::message_queue::size_type recvd_size;
unsigned int priority;
if (!mqDataConfig.timed_receive(&data, sizeof(data), recvd_size, priority,
pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(250)))
{
throw std::runtime_error("timeout in timed_receive");
}
return data;
}
//Get Var Data
bip::message_queue mqData{ bip::open_only, "msgQData" };
float getData()
{
float data;
bip::message_queue::size_type recvd_size;
unsigned int priority;
if (!mqData.timed_receive(&data, sizeof(data), recvd_size, priority,
pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(250)))
{
throw std::runtime_error("timeout in timed_receive");
}
return data;
}
};
int main() {
std::vector<float> DataArray;
int count = 0;
float maxMonitorTime = 10.f;
DataArray.reserve(100000);
//Fetch this from Producer
float noVarsToMonitor = 0.f;
float minTimeStep = 0.f;
float maxSimSamples = 0.f;
while (true)
{
try
{
ObserverLogic instance;
//Get Numbers of vars to monitor - used another thread!
noVarsToMonitor = instance.getDataConfig();
minTimeStep = instance.getDataConfig();
maxSimSamples = (noVarsToMonitor*(maxMonitorTime * floor((1 / minTimeStep) + 0.5)))-1;
std::cout << "noVarsToMonitor: " << noVarsToMonitor << std::endl;
std::cout << "minTimeStep: " << minTimeStep << std::endl;
std::cout << "maxSimSamples: " << maxSimSamples << std::endl;
std::ofstream ofs("IPQ_Debug.log", std::ios::trunc); //Only clear when data is ready from Producer
//Get Var Data below here:
try
{
while (DataArray.size() <= maxSimSamples)
{
float value = instance.getData();
DataArray.push_back(value);
ofs << value << "; ";
count++;
if (count>noVarsToMonitor - 1) //Split Vector to match no Vars pr. Timestep
{
ofs << std::endl;
count = 0;
}
}
std::cout << "Received " << DataArray.size() << " messages\n";
std::cout << "\n\ndone" << std::endl;
std::cout << std::endl;
}
catch (std::exception const &e)
{
std::cout << "Exception caught: " << e.what() << "\n";
}
}
catch (std::exception const &e)
{
std::cout << "Exception caught: " << e.what() << "\n";
}
std::cout << "Wait 5 seconds to try fetch again" << "\n";
Sleep(5000); //sleep and wait to run loop again before looking at for msqQ
}
getchar();
}
输出到txt:
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
41; 67; 34; 0; 69; 24; 78; 58; 62; 64;
然后可以根据 "simulation time" 将数据绘制在右列和行中。
它可能仍然不是漂亮的代码,但我仍在学习,我很感谢我在第一次 post 时得到的支持。欢迎大家留下评论