Qt 的 Signals/Slots 临时一次性连接

Qt's Signals/Slots temporary one time connection

我有一个问题,也许你能帮我。

在特定线程中有一个 amqp 客户端与 rabbitmq 服务器通信。

我需要在客户端开始使用某些资源之前锁定(并检查它是否已经锁定)。 LockResource是各种功能的必备前提。

在第一个例子中,我断开了最后一个连接的兰巴,但有一个奇怪的行为,do_stuff() 有时被调用,有时不...

在第二个例子中,一切正常,但它会杀死多线程。

第三,为了避免内存泄漏,我需要将 AmqpClient 更改为在出现错误时始终使用“幻数”发出 resourceLocked,并在调用 do_stuff() 之前检查它...这一点都不好……

可能是我做错了什么,或者我理解错了。 如果你有更好的方法,我采纳


编辑 2018 年 9 月 16 日

我用不准确的解释误导了你: do_stuff() 不是唯一的方法,而是各种指令,否则我会直接连接它。 最好写 […] 而不是 do_stuff()。 此外,实例中只有一个唯一的客户端。

我无法提前知道用户会先执行什么。 他可以锁定资源到readResourceContent, deleteResource, writeResourceProperty, …, 所以所有这些都必须执行不同的指令。

好的是,多亏了你的回答,我有了一个使用 bool QMetaObject::invokeMethod(QObject *context, Functor function, Qt::ConnectionType type = Qt::AutoConnection, FunctorReturnType *ret = nullptr)

的有效解决方案

一般声明

using Callback = std::function<void()>;
Q_DECLARE_METATYPE(Callback)

qRegisterMetaType<Callback>("Callback");

AmqpClient.cpp

void AmqpClient::lockResource(Identifier identifier, QObject *context, const Callback &func)
{
    if(lockedResources_.contains(identifier))
    {
        QMetaObject::invokeMethod(context,
                                  func,
                                  Qt::QueuedConnection);
        return;
    }

    QString queueName(QString::number(identifier) + ".lock");
    QAmqpQueue *lockQueue = client_->createQueue(queueName);

    connect(lockQueue, qOverload<QAMQP::Error>(&QAmqpQueue::error), this, [this](QAMQP::Error error) {
        if(error == QAMQP::ResourceLockedError) {
            emit errorMessage("The expected resource is already locked by another user.");
            sender()->deleteLater();
        }
    });

    connect(lockQueue, &QAmqpQueue::declared, this, [=]() {
        QAmqpQueue *lockQueue = qobject_cast<QAmqpQueue*>(sender());
        lockQueue->consume(QAmqpQueue::coExclusive);
        lockedResources_[identifier] = lockQueue;

        QMetaObject::invokeMethod(context,
                                  func,
                                  Qt::QueuedConnection);
    });

    lockQueue->declare(QAmqpQueue::Exclusive | QAmqpQueue::AutoDelete);
}

Controller.cpp

void Controller::readResourceContent(int row) 
{    
    [...]
    QMetaObject::invokeMethod(amqp_,
                              "lockResource",
                              Qt::AutoConnection,
                              Q_ARG(Identifier, identifier),
                              Q_ARG(QObject*, this),
                              Q_ARG(Callback, [&](){ [...] }));
    [...]
}

1

// called not inside connect(...) because it may not to emit the signal 
// (if resource is already locked)
disconnect(amqp_, &AmqpClient::resourceLocked, 0, 0);

connect(amqp_, &AmqpClient::resourceLocked, this, [&](){
  do_stuff();
});

emit lockResource(identifier, QPrivateSignal());

2

// This is working like a charm, but I'm losing ui reactivity
QEventLoop loop;
connect(amqp_, &AmqpClient::resourceLocked, &loop, &QEventLoop::quit);
emit lockResource(identifier, QPrivateSignal());
loop.exec();
do_stuff();

3

// Using an intermediate object
class CallbackObject : public QObject
{
    Q_OBJECT
    std::function<void()> callback;

public:
    CallbackObject(std::function<void()> callback) : QObject(), callback(callback) {}

public slots:
    void execute() { callback(); deleteLater(); }
};

// Working but memory leak if signal is not emitted 
// resource already locked for example
CallbackObject *helper = new CallbackObject([&](){
  do_stuff() ;
});
connect(amqp_, &AmqpClient::resourceLocked, helper, &CallbackObject::execute);
emit lockResource(identifier, QPrivateSignal());

如果我对你的理解正确,你正在尝试设置以下事件序列:

          Client                          AMQPClient
             |       lockResource(id)        |
             |------------------------------>|
             |                               |
             |                               |--\
             |                               |   | 
             |        resourceLocked         |   | try to acquire resource
             |<------------------------------|   |
          /--|                               |<-/
do_stuff |   |                               |
          \->|                               |
             |                               |

AMQP 无法获取资源时不会发送消息 resourceLocked,因此在这种情况下也不会调用 do_stuff

使用实现 1 时未调用 do_stuff 的问题是当您有多个客户端等待其各自的资源同时被锁定时的竞争条件。演示您描述的问题(加上该方法以及您的实施 3 的其他问题)的一个序列如下:

  1. 客户端 A 进入功能 1:它断开与 AmqpClient::resourceLocked 的所有现有连接,然后将自己连接到该信号。
  2. 客户端 B 现在也进入功能 1:它还断开与 AmpqClient::resourceLocked 的所有现有连接,特别是客户端 A 刚刚建立的连接。然后它自己连接到该信号。
  3. AmpqClient 处理客户端 A 的 lockResource 请求。它获取了请求的资源并发出 resourceLocked 信号。
  4. 客户端 B,作为唯一连接到信号的客户端,现在执行 do_stuff,即使它是客户端 A 请求的资源已获取。
  5. AmpqClient 处理客户端 B 的 lockResource 请求。同样,它成功获取资源并发出 resourceLocked 信号。
  6. 客户端 B,仍然连接到那个信号,再次执行 do_stuff

上面的序列表明 do_stuff 客户端 A 未被调用并不算太糟糕,因为客户端 B 在其资源尚未被获取时就开始工作。

要解决此问题,您必须确保只调用刚刚处理了 lockResource 请求的客户端的 do_stuff。一个信号——根据设计总是通知所有个观察者——只是一种次优的方法,因为观察者随后需要检查信号是否是为了通知他们或其他人。

第一个修复是修改 lockResource 信号以同时发送发出信号的客户端的 this 指针。这样 AmqpClient 可以使用该指针在客户端获取资源时回调它。

连接到信号的插槽的一个实现可能如下所示:

void AmqpClient::handleLockResourceRequest(int identifier,
                                           QObject* requestingClient)
{
   // try to acquire resource described by `identifier`

   if (resource_acquired_successfully)
   {
       QMetaObject::invokeMethod(requestingClient, "do_stuff", Qt::AutoConnection);
   }
 }

请注意,为了使 invokeMethod 起作用,do_stuff 必须是插槽或需要标记为 Q_INVOKABLE

但我会更进一步,而不是通过 invokeMethod 调用 do_stuff:据我所知,AmqpClient 是唯一连接到 [= 的侦听器20=],如果这两个对象在同一个线程中,您可能只需要使用直接调用 amqp_->tryToLockResource(identifier) 而不是 emit lockResource(identifier)。 IE。使用信号的唯一原因是调用通过事件循环。

除了使用信号,您还可以使用 invokeMethod 通过事件循环请求获取资源。这样做的好处是您的客户端 class 不再向 class 的用户暴露它需要获取资源来完成其工作的事实。

总而言之,生成的代码如下所示:

class ClientType : public QObject {
  Q_OBJECT
public:
  // ...

  Q_INVOKABLE void do_stuff(); // definition as before

private:
   void requestResource(); // was previously code block 1 in your question

private:
  AmqpClient* amqp_;
  // ...
};

inline void ClientType::requestResource() 
{
  auto identifier = ...; // create resource identifier

  QMetaObject::invokeMethod(amqp_, 
                            "requestResource", 
                            Qt::AutoConnection, 
                            Q_ARG(int, identifier),
                            Q_ARG(QObject*, this));
}



class AmqpClient : public QObject {
  Q_OBJECT
public:
  // ...

  Q_INVOKABLE requestResource(int identifier, QObject* requestingClient); 

};

inline void AmqpClient::requestResource(int identifier,
                                        QObject* requestingClient)
{
   // try to acquire resource described by `identifier`

   if (resource_acquired_successfully)
   {
       QMetaObject::invokeMethod(requestingClient, "do_stuff", Qt::AutoConnection);
   }
}

在上面的实现中,我假定资源标识符的类型为 int,但是您当然可以使用在 Qt 元类型系统中注册的任何其他类型。类似地,在向 Qts 元类型系统注册指针类型后,您还可以使用 ClientType*(或抽象基 class 以避免引入循环依赖),而不是传递 QObject-指针。