如何将 signal/data 从工作线程发送到主线程?
How to send signal/data from a worker thread to main thread?
首先我要说我是第一次深入研究多线程。尽管阅读了很多关于并发和同步的资料,但我并没有很容易地找到满足我的要求的解决方案。
使用 C++11 和 Boost,我试图找出如何将数据从工作线程发送到主线程。工作线程在应用程序启动时产生,并持续监视无锁队列。对象以不同的时间间隔填充此队列。这部分正在运行。
一旦数据可用,它就需要由主线程处理,因为另一个信号将被发送到不能在工作线程上的应用程序的其余部分。这就是我遇到的问题。
如果我必须通过互斥锁或条件变量阻塞主线程,直到工作线程完成,这将如何提高响应能力?我还不如只使用一个线程,这样我就可以访问数据。我一定是在这里遗漏了什么。
我 post 提出了几个问题,认为 Boost::Asio 是正确的选择。有一个关于如何在线程之间发送信号和数据的示例,但正如响应所示,事情很快变得过于复杂并且无法完美运行:
Boost::Asio with Main/Workers threads - Can I start event loop before posting work?
在与一些同事交谈后,建议使用两个队列——一个输入,一个输出。这将在共享 space 中并且输出队列将由工作线程填充。工作线程总是在运行,但可能需要一个计时器,可能在应用程序级别,这将强制主线程检查输出队列以查看是否有任何待处理的任务。
关于我应该把注意力放在哪里有什么想法吗?是否有任何技术或策略可能适用于我正在尝试做的事情?接下来我会看看定时器。
谢谢。
编辑: 这是用于 post 处理模拟结果的插件系统的生产代码。我们尽可能首先使用 C++11,然后是 Boost。我们正在使用 Boost 的 lockfree::queue。该应用程序在单个线程上执行我们想要的操作,但现在我们正在尝试优化我们发现存在性能问题的地方(在本例中,通过另一个库进行计算)。主线程有很多职责,包括数据库访问,这就是为什么我想限制工作线程实际做什么。
更新: 我已经成功地使用 std::thread 启动了一个检查 Boost lock::free 队列并处理放置在其中的任务的工作线程.这是我遇到麻烦的@Pressacco 回复中的第 5 步。有没有在工作线程完成时向主线程返回值并通知主线程的示例,而不是简单地等待工作线程完成?
如果您的 objective 从头开始开发解决方案(使用本机线程、队列等):
- 创建线程保存队列queue(Mutex/CriticalSection around add/remove)
- 创建一个与队列关联的计数信号量
- 让一个或多个工作线程等待计数信号量(即线程将阻塞)
- 信号量比让线程不断轮询队列更有效
- as messages/jobs 被添加到队列中,递增信号量
- 一个线程会被唤醒
- 线程应该删除一条消息
- 如果需要返回结果...
- 设置另一个:
Queue
+Semaphore
+WorkerThread
s
补充说明
如果您决定从头开始实现线程安全队列,请查看:
- Synchronization between threads using Critical Section
话虽如此,我还是要再看看 BOOST。我没有使用过这个库,但据我所知,它很可能包含一些相关的数据结构(例如线程安全队列)。
我最喜欢的 MSDN 名言:
"When you use multithreading of any sort, you potentially expose
yourself to very serious and complex bugs"
边栏
由于您是第一次接触并发编程,不妨考虑一下:
- 您的 objective 是要构建具有生产价值的代码,还是这只是一个学习练习?
- 生产?考虑我们现有的经过验证的库
- 学习?考虑从头开始编写代码
- 考虑使用带有异步回调的线程池而不是本机线程。
- 更多线程 != 更好
- 真的需要线程吗?
- 关注KISS principle.
上面的反馈让我找到了我需要的正确方向。该解决方案绝对比我之前尝试的必须使用 signals/slots 或 Boost::Asio 更简单。我有两个无锁队列,一个用于输入(在工作线程上),一个用于输出(在主线程上,由工作线程填充)。我使用计时器来安排何时处理输出队列。代码如下;也许它对某人有用:
//Task.h
#include <iostream>
#include <thread>
class Task
{
public:
Task(bool shutdown = false) : _shutdown(shutdown) {};
virtual ~Task() {};
bool IsShutdownRequest() { return _shutdown; }
virtual int Execute() = 0;
private:
bool _shutdown;
};
class ShutdownTask : public Task
{
public:
ShutdownTask() : Task(true) {}
virtual int Execute() { return -1; }
};
class TimeSeriesTask : public Task
{
public:
TimeSeriesTask(int value) : _value(value) {};
virtual int Execute()
{
std::cout << "Calculating on thread " << std::this_thread::get_id() << std::endl;
return _value * 2;
}
private:
int _value;
};
// Main.cpp : Defines the entry point for the console application.
#include "stdafx.h"
#include "afxwin.h"
#include <boost/lockfree/spsc_queue.hpp>
#include "Task.h"
static UINT_PTR ProcessDataCheckTimerID = 0;
static const int ProcessDataCheckPeriodInMilliseconds = 100;
class Manager
{
public:
Manager()
{
//Worker Thread with application lifetime that processes a lock free queue
_workerThread = std::thread(&Manager::ProcessInputData, this);
};
virtual ~Manager()
{
_workerThread.join();
};
void QueueData(int x)
{
if (x > 0)
{
_inputQueue.push(std::make_shared<TimeSeriesTask>(x));
}
else
{
_inputQueue.push(std::make_shared<ShutdownTask>());
}
}
void ProcessOutputData()
{
//process output data on the Main Thread
_outputQueue.consume_one([&](int value)
{
if (value < 0)
{
PostQuitMessage(WM_QUIT);
}
else
{
int result = value - 1;
std::cout << "Final result is " << result << " on thread " << std::this_thread::get_id() << std::endl;
}
});
}
private:
void ProcessInputData()
{
bool shutdown = false;
//Worker Thread processes input data indefinitely
do
{
_inputQueue.consume_one([&](std::shared_ptr<Task> task)
{
std::cout << "Getting element from input queue on thread " << std::this_thread::get_id() << std::endl;
if (task->IsShutdownRequest()) { shutdown = true; }
int result = task->Execute();
_outputQueue.push(result);
});
} while (shutdown == false);
}
std::thread _workerThread;
boost::lockfree::spsc_queue<std::shared_ptr<Task>, boost::lockfree::capacity<1024>> _inputQueue;
boost::lockfree::spsc_queue<int, boost::lockfree::capacity<1024>> _outputQueue;
};
std::shared_ptr<Manager> g_pMgr;
//timer to force Main Thread to process Manager's output queue
void CALLBACK TimerCallback(HWND hWnd, UINT nMsg, UINT nIDEvent, DWORD dwTime)
{
if (nIDEvent == ProcessDataCheckTimerID)
{
KillTimer(NULL, ProcessDataCheckPeriodInMilliseconds);
ProcessDataCheckTimerID = 0;
//call function to process data
g_pMgr->ProcessOutputData();
//reset timer
ProcessDataCheckTimerID = SetTimer(NULL, ProcessDataCheckTimerID, ProcessDataCheckPeriodInMilliseconds, (TIMERPROC)&TimerCallback);
}
}
int main()
{
std::cout << "Main thread is " << std::this_thread::get_id() << std::endl;
g_pMgr = std::make_shared<Manager>();
ProcessDataCheckTimerID = SetTimer(NULL, ProcessDataCheckTimerID, ProcessDataCheckPeriodInMilliseconds, (TIMERPROC)&TimerCallback);
//queue up some dummy data
for (int i = 1; i <= 10; i++)
{
g_pMgr->QueueData(i);
}
//queue a shutdown request
g_pMgr->QueueData(-1);
//fake the application's message loop
MSG msg;
bool shutdown = false;
while (shutdown == false)
{
if (GetMessage(&msg, NULL, 0, 0))
{
TranslateMessage(&msg);
DispatchMessage(&msg);
}
else
{
shutdown = true;
}
}
return 0;
}
首先我要说我是第一次深入研究多线程。尽管阅读了很多关于并发和同步的资料,但我并没有很容易地找到满足我的要求的解决方案。
使用 C++11 和 Boost,我试图找出如何将数据从工作线程发送到主线程。工作线程在应用程序启动时产生,并持续监视无锁队列。对象以不同的时间间隔填充此队列。这部分正在运行。
一旦数据可用,它就需要由主线程处理,因为另一个信号将被发送到不能在工作线程上的应用程序的其余部分。这就是我遇到的问题。
如果我必须通过互斥锁或条件变量阻塞主线程,直到工作线程完成,这将如何提高响应能力?我还不如只使用一个线程,这样我就可以访问数据。我一定是在这里遗漏了什么。
我 post 提出了几个问题,认为 Boost::Asio 是正确的选择。有一个关于如何在线程之间发送信号和数据的示例,但正如响应所示,事情很快变得过于复杂并且无法完美运行:
Boost::Asio with Main/Workers threads - Can I start event loop before posting work?
在与一些同事交谈后,建议使用两个队列——一个输入,一个输出。这将在共享 space 中并且输出队列将由工作线程填充。工作线程总是在运行,但可能需要一个计时器,可能在应用程序级别,这将强制主线程检查输出队列以查看是否有任何待处理的任务。
关于我应该把注意力放在哪里有什么想法吗?是否有任何技术或策略可能适用于我正在尝试做的事情?接下来我会看看定时器。
谢谢。
编辑: 这是用于 post 处理模拟结果的插件系统的生产代码。我们尽可能首先使用 C++11,然后是 Boost。我们正在使用 Boost 的 lockfree::queue。该应用程序在单个线程上执行我们想要的操作,但现在我们正在尝试优化我们发现存在性能问题的地方(在本例中,通过另一个库进行计算)。主线程有很多职责,包括数据库访问,这就是为什么我想限制工作线程实际做什么。
更新: 我已经成功地使用 std::thread 启动了一个检查 Boost lock::free 队列并处理放置在其中的任务的工作线程.这是我遇到麻烦的@Pressacco 回复中的第 5 步。有没有在工作线程完成时向主线程返回值并通知主线程的示例,而不是简单地等待工作线程完成?
如果您的 objective 从头开始开发解决方案(使用本机线程、队列等):
- 创建线程保存队列queue(Mutex/CriticalSection around add/remove)
- 创建一个与队列关联的计数信号量
- 让一个或多个工作线程等待计数信号量(即线程将阻塞)
- 信号量比让线程不断轮询队列更有效
- as messages/jobs 被添加到队列中,递增信号量
- 一个线程会被唤醒
- 线程应该删除一条消息
- 如果需要返回结果...
- 设置另一个:
Queue
+Semaphore
+WorkerThread
s
- 设置另一个:
补充说明
如果您决定从头开始实现线程安全队列,请查看:
- Synchronization between threads using Critical Section
话虽如此,我还是要再看看 BOOST。我没有使用过这个库,但据我所知,它很可能包含一些相关的数据结构(例如线程安全队列)。
我最喜欢的 MSDN 名言:
"When you use multithreading of any sort, you potentially expose yourself to very serious and complex bugs"
边栏
由于您是第一次接触并发编程,不妨考虑一下:
- 您的 objective 是要构建具有生产价值的代码,还是这只是一个学习练习?
- 生产?考虑我们现有的经过验证的库
- 学习?考虑从头开始编写代码
- 考虑使用带有异步回调的线程池而不是本机线程。
- 更多线程 != 更好
- 真的需要线程吗?
- 关注KISS principle.
上面的反馈让我找到了我需要的正确方向。该解决方案绝对比我之前尝试的必须使用 signals/slots 或 Boost::Asio 更简单。我有两个无锁队列,一个用于输入(在工作线程上),一个用于输出(在主线程上,由工作线程填充)。我使用计时器来安排何时处理输出队列。代码如下;也许它对某人有用:
//Task.h
#include <iostream>
#include <thread>
class Task
{
public:
Task(bool shutdown = false) : _shutdown(shutdown) {};
virtual ~Task() {};
bool IsShutdownRequest() { return _shutdown; }
virtual int Execute() = 0;
private:
bool _shutdown;
};
class ShutdownTask : public Task
{
public:
ShutdownTask() : Task(true) {}
virtual int Execute() { return -1; }
};
class TimeSeriesTask : public Task
{
public:
TimeSeriesTask(int value) : _value(value) {};
virtual int Execute()
{
std::cout << "Calculating on thread " << std::this_thread::get_id() << std::endl;
return _value * 2;
}
private:
int _value;
};
// Main.cpp : Defines the entry point for the console application.
#include "stdafx.h"
#include "afxwin.h"
#include <boost/lockfree/spsc_queue.hpp>
#include "Task.h"
static UINT_PTR ProcessDataCheckTimerID = 0;
static const int ProcessDataCheckPeriodInMilliseconds = 100;
class Manager
{
public:
Manager()
{
//Worker Thread with application lifetime that processes a lock free queue
_workerThread = std::thread(&Manager::ProcessInputData, this);
};
virtual ~Manager()
{
_workerThread.join();
};
void QueueData(int x)
{
if (x > 0)
{
_inputQueue.push(std::make_shared<TimeSeriesTask>(x));
}
else
{
_inputQueue.push(std::make_shared<ShutdownTask>());
}
}
void ProcessOutputData()
{
//process output data on the Main Thread
_outputQueue.consume_one([&](int value)
{
if (value < 0)
{
PostQuitMessage(WM_QUIT);
}
else
{
int result = value - 1;
std::cout << "Final result is " << result << " on thread " << std::this_thread::get_id() << std::endl;
}
});
}
private:
void ProcessInputData()
{
bool shutdown = false;
//Worker Thread processes input data indefinitely
do
{
_inputQueue.consume_one([&](std::shared_ptr<Task> task)
{
std::cout << "Getting element from input queue on thread " << std::this_thread::get_id() << std::endl;
if (task->IsShutdownRequest()) { shutdown = true; }
int result = task->Execute();
_outputQueue.push(result);
});
} while (shutdown == false);
}
std::thread _workerThread;
boost::lockfree::spsc_queue<std::shared_ptr<Task>, boost::lockfree::capacity<1024>> _inputQueue;
boost::lockfree::spsc_queue<int, boost::lockfree::capacity<1024>> _outputQueue;
};
std::shared_ptr<Manager> g_pMgr;
//timer to force Main Thread to process Manager's output queue
void CALLBACK TimerCallback(HWND hWnd, UINT nMsg, UINT nIDEvent, DWORD dwTime)
{
if (nIDEvent == ProcessDataCheckTimerID)
{
KillTimer(NULL, ProcessDataCheckPeriodInMilliseconds);
ProcessDataCheckTimerID = 0;
//call function to process data
g_pMgr->ProcessOutputData();
//reset timer
ProcessDataCheckTimerID = SetTimer(NULL, ProcessDataCheckTimerID, ProcessDataCheckPeriodInMilliseconds, (TIMERPROC)&TimerCallback);
}
}
int main()
{
std::cout << "Main thread is " << std::this_thread::get_id() << std::endl;
g_pMgr = std::make_shared<Manager>();
ProcessDataCheckTimerID = SetTimer(NULL, ProcessDataCheckTimerID, ProcessDataCheckPeriodInMilliseconds, (TIMERPROC)&TimerCallback);
//queue up some dummy data
for (int i = 1; i <= 10; i++)
{
g_pMgr->QueueData(i);
}
//queue a shutdown request
g_pMgr->QueueData(-1);
//fake the application's message loop
MSG msg;
bool shutdown = false;
while (shutdown == false)
{
if (GetMessage(&msg, NULL, 0, 0))
{
TranslateMessage(&msg);
DispatchMessage(&msg);
}
else
{
shutdown = true;
}
}
return 0;
}