启动多个线程并重新启动它们
Launching multiple threads and restarting them
我正在尝试编写一个创建 x 个工作线程的系统。这些线程将在不同的时间完成它们的工作。当他们中的任何一个完成他们的工作时,我将检查他们的输出并再次重新启动他们(将 运行 线程数保持在 x 左右)。我将对多次任意迭代执行此操作。因此,基本上一个控制器线程将启动 x 数量的线程,并在它们完成工作后重新启动它们,直到达到一定的迭代次数。
附加说明#1:当我说重启时,等到当前 exits/aborts 被销毁并创建一个新的就完全没问题了。它不必 "restart" 相同的线程。我最感兴趣的是以一种干净的异步方式来做这件事。
注意:我不是在寻找任何特定代码,而是在寻找一些可能的伪代码和使用槽和信号的设计模式。
我知道 qt 线程并使用过它们。我熟悉启动 x 数量的线程并等待所有线程使用 yield 完成并等待的示例。我正在寻找一种干净的方法来使用信号和插槽实现我在第一段中描述的内容。
每个线程都有以下内容:
- 布尔退出
- 在启动线程之前,将其设置为false
- 主线程只应将其设置为 true。订单设置标志为真,加入线程
- 工作线程应该只将它设置为 false。 Order 正在退出循环,进行任何 post 处理,将其设置为 false,返回。
- 信号量(不需要计数器,只是 post / 停止,就像在 pthreads 中一样)
- 主线程将post
- worker thread 将 运行 in a
while( !exit )
,循环中的第一个命令是等待信号量。
- bool threadWorking
- 在启动线程之前,将其设置为false
- 主线程只应将其设置为 true。命令是:准备好数据,将标志设置为真,post semaphore.
- 工作线程应该只将它设置为 false。或者是:准备好答案,通知主线程,将标志设置为 false。
这样你就可以重复使用线程了。
这就是 QtConcurrent::run()
或 QThreadPool::start()
的用途。 Concurrent 框架在内部使用线程池,因此它们是相当等价的:前者是后者的便利包装器。默认线程池最好留给 short-运行ning 任务;对于 运行 长任务,请使用您自己的线程池。您将把它作为第一个参数传递给 QtConcurrent::run()
。
QThreadPool
维护一个工作项队列,将它们分派给线程,并动态创建和销毁工作线程。您不必自己重新实现,这真是太棒了 class。
如果您没有太多的工作单元并且可以预先全部提供,只需使用 QtConcurrent::run()
或 QThreadPool::start()
将它们全部提前排队。它们可以从辅助对象发出信号,在每个完成时通知您。
如果工作单元太昂贵而无法一次创建,您将不得不在线程池之上实现一个通知工作队列。
工作单元需要通知队列及其用户它已完成。这可以做到,例如通过重新实现 QRunnable
作为 WorkUnit
的基础,将工作转发给抽象方法,并在抽象方法完成时通知队列。同样的方法也适用于 QtConcurrent::run
,只是你实现了仿函数的 operator()()
.
而不是重新实现 QRunnable::run
队列将为每个完成的工作单元发出一个 workUnitDone
信号。用户应在收到信号后用一项工作重新填充队列(如果没有更多工作,则 none)。
为方便起见,队列可以通过发出 workUnitDone(nullptr)
来请求一些初始工作项。如果每次前一个项目完成时您恰好补充一个项目,队列将保持工作项目的初始数量。
如果项目需要很短的时间来处理,您应该有比线程数更多的可用线程数,这样线程就不会闲置而不工作。对于大部分耗时较长(数十毫秒或更长时间)的项目,QThread::idealThreadCount
.
的 1.5-2 倍就足够了
添加到队列中的工作单元可以是 WorkUnit
的实例,也可以是仿函数。
// https://github.com/KubaO/Whosebugn/tree/master/questions/notified-workqueue-38000605
#include <QtCore>
#include <type_traits>
class WorkUnit;
class WorkQueue : public QObject {
Q_OBJECT
friend class WorkUnit;
QThreadPool m_pool{this};
union alignas(64) { // keep it in its own cache line
QAtomicInt queuedUnits{0};
char filler[64];
} d;
void isDone(WorkUnit * unit) {
auto queued = d.queuedUnits.deref();
emit workUnitDone(unit);
if (!queued) emit finished();
}
public:
explicit WorkQueue(int initialUnits = 0) {
if (initialUnits)
QTimer::singleShot(0, [=]{
for (int i = 0; i < initialUnits; ++i)
emit workUnitDone(nullptr);
});
}
Q_SLOT void addWork(WorkUnit * unit);
template <typename F> void addFunctor(F && functor);
Q_SIGNAL void workUnitDone(WorkUnit *);
Q_SIGNAL void finished();
};
class WorkUnit : public QRunnable {
friend class WorkQueue;
WorkQueue * m_queue { nullptr };
void run() override {
work();
m_queue->isDone(this);
}
protected:
virtual void work() = 0;
};
template <typename F>
class FunctorUnit : public WorkUnit, private F {
void work() override { (*this)(); }
public:
FunctorUnit(F && f) : F(std::move(f)) {}
};
void WorkQueue::addWork(WorkUnit *unit) {
d.queuedUnits.ref();
unit->m_queue = this;
m_pool.start(unit);
}
template <typename F> void WorkQueue::addFunctor(F && functor) {
addWork(new FunctorUnit<typename std::decay<F>::type>{std::forward<F>(functor)});
}
为了演示,让我们在 1us 到 1s 之间的随机时间内进行 50 个单位的 "work" 睡眠。我们将一半的单位作为 SleepyWork
个实例传递,另一半作为 lambdas。
#include <random>
struct SleepyWork : WorkUnit {
int usecs;
SleepyWork(int usecs) : usecs(usecs) {}
void work() override {
QThread::usleep(usecs);
qDebug() << "slept" << usecs;
}
};
int main(int argc, char ** argv) {
QCoreApplication app{argc, argv};
std::random_device dev;
std::default_random_engine eng{dev()};
std::uniform_int_distribution<int> dist{1, 1000000};
auto rand_usecs = [&]{ return dist(eng); };
int workUnits = 50;
WorkQueue queue{2*QThread::idealThreadCount()};
QObject::connect(&queue, &WorkQueue::workUnitDone, [&]{
if (workUnits) {
if (workUnits % 2) {
auto us = dist(eng);
queue.addFunctor([us]{
QThread::usleep(us);
qDebug() << "slept" << us;
});
} else
queue.addWork(new SleepyWork{rand_usecs()});
--workUnits;
}
});
QObject::connect(&queue, &WorkQueue::finished, [&]{
if (workUnits == 0) app.quit();
});
return app.exec();
}
#include "main.moc"
示例到此结束。
我正在尝试编写一个创建 x 个工作线程的系统。这些线程将在不同的时间完成它们的工作。当他们中的任何一个完成他们的工作时,我将检查他们的输出并再次重新启动他们(将 运行 线程数保持在 x 左右)。我将对多次任意迭代执行此操作。因此,基本上一个控制器线程将启动 x 数量的线程,并在它们完成工作后重新启动它们,直到达到一定的迭代次数。
附加说明#1:当我说重启时,等到当前 exits/aborts 被销毁并创建一个新的就完全没问题了。它不必 "restart" 相同的线程。我最感兴趣的是以一种干净的异步方式来做这件事。
注意:我不是在寻找任何特定代码,而是在寻找一些可能的伪代码和使用槽和信号的设计模式。
我知道 qt 线程并使用过它们。我熟悉启动 x 数量的线程并等待所有线程使用 yield 完成并等待的示例。我正在寻找一种干净的方法来使用信号和插槽实现我在第一段中描述的内容。
每个线程都有以下内容:
- 布尔退出
- 在启动线程之前,将其设置为false
- 主线程只应将其设置为 true。订单设置标志为真,加入线程
- 工作线程应该只将它设置为 false。 Order 正在退出循环,进行任何 post 处理,将其设置为 false,返回。
- 信号量(不需要计数器,只是 post / 停止,就像在 pthreads 中一样)
- 主线程将post
- worker thread 将 运行 in a
while( !exit )
,循环中的第一个命令是等待信号量。
- bool threadWorking
- 在启动线程之前,将其设置为false
- 主线程只应将其设置为 true。命令是:准备好数据,将标志设置为真,post semaphore.
- 工作线程应该只将它设置为 false。或者是:准备好答案,通知主线程,将标志设置为 false。
这样你就可以重复使用线程了。
这就是 QtConcurrent::run()
或 QThreadPool::start()
的用途。 Concurrent 框架在内部使用线程池,因此它们是相当等价的:前者是后者的便利包装器。默认线程池最好留给 short-运行ning 任务;对于 运行 长任务,请使用您自己的线程池。您将把它作为第一个参数传递给 QtConcurrent::run()
。
QThreadPool
维护一个工作项队列,将它们分派给线程,并动态创建和销毁工作线程。您不必自己重新实现,这真是太棒了 class。
如果您没有太多的工作单元并且可以预先全部提供,只需使用 QtConcurrent::run()
或 QThreadPool::start()
将它们全部提前排队。它们可以从辅助对象发出信号,在每个完成时通知您。
如果工作单元太昂贵而无法一次创建,您将不得不在线程池之上实现一个通知工作队列。
工作单元需要通知队列及其用户它已完成。这可以做到,例如通过重新实现 QRunnable
作为 WorkUnit
的基础,将工作转发给抽象方法,并在抽象方法完成时通知队列。同样的方法也适用于 QtConcurrent::run
,只是你实现了仿函数的 operator()()
.
QRunnable::run
队列将为每个完成的工作单元发出一个 workUnitDone
信号。用户应在收到信号后用一项工作重新填充队列(如果没有更多工作,则 none)。
为方便起见,队列可以通过发出 workUnitDone(nullptr)
来请求一些初始工作项。如果每次前一个项目完成时您恰好补充一个项目,队列将保持工作项目的初始数量。
如果项目需要很短的时间来处理,您应该有比线程数更多的可用线程数,这样线程就不会闲置而不工作。对于大部分耗时较长(数十毫秒或更长时间)的项目,QThread::idealThreadCount
.
添加到队列中的工作单元可以是 WorkUnit
的实例,也可以是仿函数。
// https://github.com/KubaO/Whosebugn/tree/master/questions/notified-workqueue-38000605
#include <QtCore>
#include <type_traits>
class WorkUnit;
class WorkQueue : public QObject {
Q_OBJECT
friend class WorkUnit;
QThreadPool m_pool{this};
union alignas(64) { // keep it in its own cache line
QAtomicInt queuedUnits{0};
char filler[64];
} d;
void isDone(WorkUnit * unit) {
auto queued = d.queuedUnits.deref();
emit workUnitDone(unit);
if (!queued) emit finished();
}
public:
explicit WorkQueue(int initialUnits = 0) {
if (initialUnits)
QTimer::singleShot(0, [=]{
for (int i = 0; i < initialUnits; ++i)
emit workUnitDone(nullptr);
});
}
Q_SLOT void addWork(WorkUnit * unit);
template <typename F> void addFunctor(F && functor);
Q_SIGNAL void workUnitDone(WorkUnit *);
Q_SIGNAL void finished();
};
class WorkUnit : public QRunnable {
friend class WorkQueue;
WorkQueue * m_queue { nullptr };
void run() override {
work();
m_queue->isDone(this);
}
protected:
virtual void work() = 0;
};
template <typename F>
class FunctorUnit : public WorkUnit, private F {
void work() override { (*this)(); }
public:
FunctorUnit(F && f) : F(std::move(f)) {}
};
void WorkQueue::addWork(WorkUnit *unit) {
d.queuedUnits.ref();
unit->m_queue = this;
m_pool.start(unit);
}
template <typename F> void WorkQueue::addFunctor(F && functor) {
addWork(new FunctorUnit<typename std::decay<F>::type>{std::forward<F>(functor)});
}
为了演示,让我们在 1us 到 1s 之间的随机时间内进行 50 个单位的 "work" 睡眠。我们将一半的单位作为 SleepyWork
个实例传递,另一半作为 lambdas。
#include <random>
struct SleepyWork : WorkUnit {
int usecs;
SleepyWork(int usecs) : usecs(usecs) {}
void work() override {
QThread::usleep(usecs);
qDebug() << "slept" << usecs;
}
};
int main(int argc, char ** argv) {
QCoreApplication app{argc, argv};
std::random_device dev;
std::default_random_engine eng{dev()};
std::uniform_int_distribution<int> dist{1, 1000000};
auto rand_usecs = [&]{ return dist(eng); };
int workUnits = 50;
WorkQueue queue{2*QThread::idealThreadCount()};
QObject::connect(&queue, &WorkQueue::workUnitDone, [&]{
if (workUnits) {
if (workUnits % 2) {
auto us = dist(eng);
queue.addFunctor([us]{
QThread::usleep(us);
qDebug() << "slept" << us;
});
} else
queue.addWork(new SleepyWork{rand_usecs()});
--workUnits;
}
});
QObject::connect(&queue, &WorkQueue::finished, [&]{
if (workUnits == 0) app.quit();
});
return app.exec();
}
#include "main.moc"
示例到此结束。