中断 paho mqtt 客户端以重新加载订阅

Interrupt paho mqtt client to reload subscriptions

我有一个基于配置文件订阅主题的 mqtt 客户端应用程序。类似于:

def connectMQTT():
    global Connection
    Connection = Client()
    Connection.on_message = handleQuery
    for clientid in clientids.allIDs(): # clientids.allIDs() reads files to get this
        topic = '{}/{}/Q/+'.format(Basename, clientid)
        print('subscription:', topic)
        Connection.subscribe(topic)

我一直在通过简单的调用使用它,例如:

def main():
    connectMQTT()
    Connection.loop_forever()

loop_forever 将永远阻塞。但是我想注意到 clientids.allIDs() 读取的信息何时过时,我应该重新连接强制它重新订阅。

我可以通过 pyinotify:

检测到文件中的更改
def filesChanged():
    # NOT SURE WHAT TO DO HERE

def watchForChanges():
    watchManager = pyinotify.WatchManager()
    notifier = pyinotify.ThreadedNotifier(watchManager, FileEventHandler(eventCallback))
    notifier.start()
    watchManager.add_watch('/etc/my/config/dir', pyinotify.IN_CLOSE_WRITE | pyinotify.IN_DELETE)

基本上,我需要 loop_forever(或其他一些 paho mqtt 机制)到 运行,直到一些信号来自 pyinotify 机器。我不确定如何将这两者焊接在一起。在伪代码中,我想要类似

的东西
def main():
    signal = setup_directory_change_signal()
    while True:
        connectMQTT()
        Connection.loop(until=signal)
        Connection.disconnect()

虽然我不确定如何实现它。

我终于找到了以下似乎有效的解决方案。虽然我试图 运行 另一个线程中的通知程序和主线程中的 mqtt 循环,但技巧似乎是反转该设置:

def restartMQTT():
    if Connection:
        Connection.loop_stop()
    connectMQTT()
    Connection.loop_start()

class FileEventHandler(pyinotify.ProcessEvent):
    def process_IN_CREATE(self, fileEvent):
        restartMQTT()

    def process_IN_DELETE(self, fileEvent):
        restartMQTT()


def main():
    restartMQTT()
    watchManager = pyinotify.WatchManager()
    notifier = pyinotify.Notifier(watchManager, FileEventHandler())
    watchManager.add_watch('/etc/my/config_directory', pyinotify.IN_CREATE | pyinotify.IN_DELETE)
    notifier.loop()

其中 connectMQTTConnection 全局中存储新连接和配置的 MQTT 客户端。