Qt 应用程序中具有共享资源的工作线程
Worker threads with shared resources in Qt application
我正在开发一个涉及与一个或多个设备进行串行通信的 Qt 应用程序。有不同的程序可以同时执行,每个程序可以向设备发送一个或未知数量的命令,并可以接收数据作为响应。为了更清楚地说明,这里是场景的图形说明:
点击一个按钮触发相应过程的执行。 因此,当用户在短时间内单击两个或多个按钮时,两个或多个不同的过程可能会同时 运行ning。实际上,它们之间唯一可以共享的是与单个设备的串行通信;否则它们大部分是相互独立的。这里有两个 pseudo-code 程序可能看起来像的例子:
程序A:
begin
write command a1 on serial port
wait for one second
perform some computations
write command a2 on serial port
wait for one second
end
程序B:
begin
while true:
write command b1 on serial port
read the response from serial port
perform some computations
if a condition holds return, otherwise continue
end
我的解决方案及其问题:
为简化情况,假设只有一台设备需要与之通信。由于程序可以同时执行(并且一次只有一个程序可以通过串口与设备通信)我为每个程序创建了一个线程和一个工作人员class,并将工作人员移动到他们对应的线程。为了在访问串行端口时同步程序,我创建了一个互斥体:
MainWindow.h
class MainWindow : public QMainWindow {
public:
//...
QSerialPort* serial_;
QMutex serial_mutex_;
private:
//...
ProcAWorker* proca_worker;
ProcBWorker* procb_worker;
ProcCWorker* procc_worker;
ProcDWorker* procd_worker;
QThread proca_thread;
QThread procb_thread;
QThread procc_thread;
QThread procd_thread;
}
MainWindow.cpp
void MainWindow::onConnectButtonClicked()
{
serial_ = new QSerialPort();
// configure serial port settings
serial_->open(QIODevice::ReadWrite);
}
void MainWindow::onButtonAClicked()
{
proca_worker = new ProcAWorker(0, this); // pass a pointer to this class to be able to access its methods and members
proca_worker->moveToThread(&proca_thread);
// setup worker-thread connections: started, quit, finished, etc.
proca_thread.start(); // triggers `proccess` slot in proca_worker
}
// same thing for other buttons and procedures
ProcAWorker.cpp
void ProcAWorker::ProcAWorker(QObject *parent, QMainWindow *wnd) :
QObject(parent), wnd_(wnd)
{
}
void ProcAWorker::process()
{
wnd_->serial_mutex_->lock();
wnd_->serial_->write('Command a1'); // Warning occurs in this line
bool write_ok = client_->serial_->waitForBytesWritten(SERIAL_WRITE_TIMEOUT);
wnd_->serial_mutex_->unlock();
QThread::sleep(1);
// perform some computations
wnd_->serial_mutex_->lock();
wnd_->serial_->write('Command a2');
bool write_ok = client_->serial_->waitForBytesWritten(SERIAL_WRITE_TIMEOUT);
wnd_->serial_mutex_->unlock();
if (write_ok) {
// signal successful to main window
emit success();
}
}
但是,当对串口执行写操作时(即wnd_->serial_->write('Command a1');
),显示如下警告:
QObject: Cannot create children for a parent that is in a different
thread. (Parent is QSerialPort(0x18907d0), parent's thread is
QThread(0x13cbc50), current thread is QThread(0x17d8d08)
我的问题:
1) 我已经查看了 Whosebug 上关于此警告的其他问题,但他们的回答只提到应该使用 signal/slot。我熟悉使用 signal/slot 与工作线程通信。但是,我无法弄清楚如何使用 signal/slot 实现我的特定场景(同时 运行ning 过程与共享资源,如串口),或者我如何修改我当前的解决方案来解决这个问题? 请注意,应允许程序 运行 并行(除非在它们想要与设备通信的那些时刻)。显然可以 运行 按顺序执行这些程序(即一个接一个),但我不是在寻找这样的解决方案。
2) 实际上还有一个"Halt" 按钮可以停止所有运行ning 程序并向设备发送停止命令。但我也想不出实现这个功能(设置标志、发送退出信号等)。能否请您在这方面也给我一些提示?
首先,您不需要显式多线程(它是可选的),其次您不需要任何手动管理的同步原语。
然后,使用状态机为每个过程建模。希望通信协议允许每个过程识别对自己命令的响应,这样即使您将传入数据复制到所有过程,它们也会忽略与它们无关的数据。
has a sketch of a solution that does exactly what you want, sans multiplexing. Multiplexing a QIODevice
is trivial when you expose it via local pipes:从端口传入的所有内容都写入一个或多个本地管道的一端。从管道传入的所有内容都写入端口。只要您以 Unbuffered
模式打开它们的过程结束,管道就会保持数据包的完整性。这样每个 write
将作为连续的字节块到达串行端口,并以相同的方式写入端口。
你会如何复用?像这样:
class IODeviceMux : public QObject {
Q_OBJECT
QVector<QPointer<AppPipe>> m_portPipes;
QVector<QPointer<AppPipe>> m_userPipes;
QPointer<QSerialPort> m_port;
public:
IODeviceMux(QObject *parent = {}) : QObject(parent) {}
void setPort(QIODevice *port) {
if (m_port) {
disconnect(m_port.get(), 0, this, 0);
m_userPipes.removeAll({});
for (auto pipe : qAsConst(m_userPipes))
disconnect(m_port.get(), 0, pipe.get(), 0);
}
m_port = port;
connect(m_port.get(), &QIODevice::readyRead, this, &IODeviceMux::onPortRead);
}
AppPipe *getPipe() {
QScopedPointer<AppPipe> user(new AppPipe(QIODevice::ReadWrite | QIODevice::Unbuffered));
auto *port = new AppPipe(QIODevice::ReadWrite | QIODevice::Unbuffered, this);
user->addOther(port);
connect(port, &QIODevice::readyRead, this, &IODeviceMux::onPipeRead);
connect(m_port.get(), &QIODevice::bytesWritten, user.get(), &QIODevice::bytesWritten);
connect(user, &QObject::destroyed, port, &QObject::deleteLater);
m_userPipes.push_back(user.get());
m_portPipes.push_back(port);
return user.take();
}
private:
void onPortRead() {
if (!m_port) return;
auto data = m_port->readAll();
m_portPipes.removeAll({});
for (auto pipe : qAsConst(m_portPipes))
pipe->write(data);
}
void onPipeRead() {
auto *pipe = qobject_cast<AppPipe*>(sender());
QByteArray data;
if (pipe) data = pipe->readAll();
if (m_port) m_port->write(data);
}
};
每个过程都会 getPipe()
并将管道视为串行端口设备。每个 write
进入管道都会在端口上忠实地执行。端口上的每个 readyRead
都被忠实地转发,相同的数据量可以立即读取。连端口的bytesWritten
都转发了。但是 bytesToWrite
不起作用 - 它总是 returns 零。这可以通过向 AppPipe
添加一个选项来查询此值来解决。
我想这就是让它正常工作所需的全部内容。
我正在开发一个涉及与一个或多个设备进行串行通信的 Qt 应用程序。有不同的程序可以同时执行,每个程序可以向设备发送一个或未知数量的命令,并可以接收数据作为响应。为了更清楚地说明,这里是场景的图形说明:
点击一个按钮触发相应过程的执行。 因此,当用户在短时间内单击两个或多个按钮时,两个或多个不同的过程可能会同时 运行ning。实际上,它们之间唯一可以共享的是与单个设备的串行通信;否则它们大部分是相互独立的。这里有两个 pseudo-code 程序可能看起来像的例子:
程序A:
begin
write command a1 on serial port
wait for one second
perform some computations
write command a2 on serial port
wait for one second
end
程序B:
begin
while true:
write command b1 on serial port
read the response from serial port
perform some computations
if a condition holds return, otherwise continue
end
我的解决方案及其问题:
为简化情况,假设只有一台设备需要与之通信。由于程序可以同时执行(并且一次只有一个程序可以通过串口与设备通信)我为每个程序创建了一个线程和一个工作人员class,并将工作人员移动到他们对应的线程。为了在访问串行端口时同步程序,我创建了一个互斥体:
MainWindow.h
class MainWindow : public QMainWindow {
public:
//...
QSerialPort* serial_;
QMutex serial_mutex_;
private:
//...
ProcAWorker* proca_worker;
ProcBWorker* procb_worker;
ProcCWorker* procc_worker;
ProcDWorker* procd_worker;
QThread proca_thread;
QThread procb_thread;
QThread procc_thread;
QThread procd_thread;
}
MainWindow.cpp
void MainWindow::onConnectButtonClicked()
{
serial_ = new QSerialPort();
// configure serial port settings
serial_->open(QIODevice::ReadWrite);
}
void MainWindow::onButtonAClicked()
{
proca_worker = new ProcAWorker(0, this); // pass a pointer to this class to be able to access its methods and members
proca_worker->moveToThread(&proca_thread);
// setup worker-thread connections: started, quit, finished, etc.
proca_thread.start(); // triggers `proccess` slot in proca_worker
}
// same thing for other buttons and procedures
ProcAWorker.cpp
void ProcAWorker::ProcAWorker(QObject *parent, QMainWindow *wnd) :
QObject(parent), wnd_(wnd)
{
}
void ProcAWorker::process()
{
wnd_->serial_mutex_->lock();
wnd_->serial_->write('Command a1'); // Warning occurs in this line
bool write_ok = client_->serial_->waitForBytesWritten(SERIAL_WRITE_TIMEOUT);
wnd_->serial_mutex_->unlock();
QThread::sleep(1);
// perform some computations
wnd_->serial_mutex_->lock();
wnd_->serial_->write('Command a2');
bool write_ok = client_->serial_->waitForBytesWritten(SERIAL_WRITE_TIMEOUT);
wnd_->serial_mutex_->unlock();
if (write_ok) {
// signal successful to main window
emit success();
}
}
但是,当对串口执行写操作时(即wnd_->serial_->write('Command a1');
),显示如下警告:
QObject: Cannot create children for a parent that is in a different thread. (Parent is QSerialPort(0x18907d0), parent's thread is QThread(0x13cbc50), current thread is QThread(0x17d8d08)
我的问题:
1) 我已经查看了 Whosebug 上关于此警告的其他问题,但他们的回答只提到应该使用 signal/slot。我熟悉使用 signal/slot 与工作线程通信。但是,我无法弄清楚如何使用 signal/slot 实现我的特定场景(同时 运行ning 过程与共享资源,如串口),或者我如何修改我当前的解决方案来解决这个问题? 请注意,应允许程序 运行 并行(除非在它们想要与设备通信的那些时刻)。显然可以 运行 按顺序执行这些程序(即一个接一个),但我不是在寻找这样的解决方案。
2) 实际上还有一个"Halt" 按钮可以停止所有运行ning 程序并向设备发送停止命令。但我也想不出实现这个功能(设置标志、发送退出信号等)。能否请您在这方面也给我一些提示?
首先,您不需要显式多线程(它是可选的),其次您不需要任何手动管理的同步原语。
然后,使用状态机为每个过程建模。希望通信协议允许每个过程识别对自己命令的响应,这样即使您将传入数据复制到所有过程,它们也会忽略与它们无关的数据。
QIODevice
is trivial when you expose it via local pipes:从端口传入的所有内容都写入一个或多个本地管道的一端。从管道传入的所有内容都写入端口。只要您以 Unbuffered
模式打开它们的过程结束,管道就会保持数据包的完整性。这样每个 write
将作为连续的字节块到达串行端口,并以相同的方式写入端口。
你会如何复用?像这样:
class IODeviceMux : public QObject {
Q_OBJECT
QVector<QPointer<AppPipe>> m_portPipes;
QVector<QPointer<AppPipe>> m_userPipes;
QPointer<QSerialPort> m_port;
public:
IODeviceMux(QObject *parent = {}) : QObject(parent) {}
void setPort(QIODevice *port) {
if (m_port) {
disconnect(m_port.get(), 0, this, 0);
m_userPipes.removeAll({});
for (auto pipe : qAsConst(m_userPipes))
disconnect(m_port.get(), 0, pipe.get(), 0);
}
m_port = port;
connect(m_port.get(), &QIODevice::readyRead, this, &IODeviceMux::onPortRead);
}
AppPipe *getPipe() {
QScopedPointer<AppPipe> user(new AppPipe(QIODevice::ReadWrite | QIODevice::Unbuffered));
auto *port = new AppPipe(QIODevice::ReadWrite | QIODevice::Unbuffered, this);
user->addOther(port);
connect(port, &QIODevice::readyRead, this, &IODeviceMux::onPipeRead);
connect(m_port.get(), &QIODevice::bytesWritten, user.get(), &QIODevice::bytesWritten);
connect(user, &QObject::destroyed, port, &QObject::deleteLater);
m_userPipes.push_back(user.get());
m_portPipes.push_back(port);
return user.take();
}
private:
void onPortRead() {
if (!m_port) return;
auto data = m_port->readAll();
m_portPipes.removeAll({});
for (auto pipe : qAsConst(m_portPipes))
pipe->write(data);
}
void onPipeRead() {
auto *pipe = qobject_cast<AppPipe*>(sender());
QByteArray data;
if (pipe) data = pipe->readAll();
if (m_port) m_port->write(data);
}
};
每个过程都会 getPipe()
并将管道视为串行端口设备。每个 write
进入管道都会在端口上忠实地执行。端口上的每个 readyRead
都被忠实地转发,相同的数据量可以立即读取。连端口的bytesWritten
都转发了。但是 bytesToWrite
不起作用 - 它总是 returns 零。这可以通过向 AppPipe
添加一个选项来查询此值来解决。
我想这就是让它正常工作所需的全部内容。