中断 paho mqtt 客户端以重新加载订阅
Interrupt paho mqtt client to reload subscriptions
我有一个基于配置文件订阅主题的 mqtt 客户端应用程序。类似于:
def connectMQTT():
global Connection
Connection = Client()
Connection.on_message = handleQuery
for clientid in clientids.allIDs(): # clientids.allIDs() reads files to get this
topic = '{}/{}/Q/+'.format(Basename, clientid)
print('subscription:', topic)
Connection.subscribe(topic)
我一直在通过简单的调用使用它,例如:
def main():
connectMQTT()
Connection.loop_forever()
loop_forever
将永远阻塞。但是我想注意到 clientids.allIDs()
读取的信息何时过时,我应该重新连接强制它重新订阅。
我可以通过 pyinotify
:
检测到文件中的更改
def filesChanged():
# NOT SURE WHAT TO DO HERE
def watchForChanges():
watchManager = pyinotify.WatchManager()
notifier = pyinotify.ThreadedNotifier(watchManager, FileEventHandler(eventCallback))
notifier.start()
watchManager.add_watch('/etc/my/config/dir', pyinotify.IN_CLOSE_WRITE | pyinotify.IN_DELETE)
基本上,我需要 loop_forever
(或其他一些 paho mqtt 机制)到 运行,直到一些信号来自 pyinotify 机器。我不确定如何将这两者焊接在一起。在伪代码中,我想要类似
的东西
def main():
signal = setup_directory_change_signal()
while True:
connectMQTT()
Connection.loop(until=signal)
Connection.disconnect()
虽然我不确定如何实现它。
我终于找到了以下似乎有效的解决方案。虽然我试图 运行 另一个线程中的通知程序和主线程中的 mqtt 循环,但技巧似乎是反转该设置:
def restartMQTT():
if Connection:
Connection.loop_stop()
connectMQTT()
Connection.loop_start()
class FileEventHandler(pyinotify.ProcessEvent):
def process_IN_CREATE(self, fileEvent):
restartMQTT()
def process_IN_DELETE(self, fileEvent):
restartMQTT()
def main():
restartMQTT()
watchManager = pyinotify.WatchManager()
notifier = pyinotify.Notifier(watchManager, FileEventHandler())
watchManager.add_watch('/etc/my/config_directory', pyinotify.IN_CREATE | pyinotify.IN_DELETE)
notifier.loop()
其中 connectMQTT
在 Connection
全局中存储新连接和配置的 MQTT 客户端。
我有一个基于配置文件订阅主题的 mqtt 客户端应用程序。类似于:
def connectMQTT():
global Connection
Connection = Client()
Connection.on_message = handleQuery
for clientid in clientids.allIDs(): # clientids.allIDs() reads files to get this
topic = '{}/{}/Q/+'.format(Basename, clientid)
print('subscription:', topic)
Connection.subscribe(topic)
我一直在通过简单的调用使用它,例如:
def main():
connectMQTT()
Connection.loop_forever()
loop_forever
将永远阻塞。但是我想注意到 clientids.allIDs()
读取的信息何时过时,我应该重新连接强制它重新订阅。
我可以通过 pyinotify
:
def filesChanged():
# NOT SURE WHAT TO DO HERE
def watchForChanges():
watchManager = pyinotify.WatchManager()
notifier = pyinotify.ThreadedNotifier(watchManager, FileEventHandler(eventCallback))
notifier.start()
watchManager.add_watch('/etc/my/config/dir', pyinotify.IN_CLOSE_WRITE | pyinotify.IN_DELETE)
基本上,我需要 loop_forever
(或其他一些 paho mqtt 机制)到 运行,直到一些信号来自 pyinotify 机器。我不确定如何将这两者焊接在一起。在伪代码中,我想要类似
def main():
signal = setup_directory_change_signal()
while True:
connectMQTT()
Connection.loop(until=signal)
Connection.disconnect()
虽然我不确定如何实现它。
我终于找到了以下似乎有效的解决方案。虽然我试图 运行 另一个线程中的通知程序和主线程中的 mqtt 循环,但技巧似乎是反转该设置:
def restartMQTT():
if Connection:
Connection.loop_stop()
connectMQTT()
Connection.loop_start()
class FileEventHandler(pyinotify.ProcessEvent):
def process_IN_CREATE(self, fileEvent):
restartMQTT()
def process_IN_DELETE(self, fileEvent):
restartMQTT()
def main():
restartMQTT()
watchManager = pyinotify.WatchManager()
notifier = pyinotify.Notifier(watchManager, FileEventHandler())
watchManager.add_watch('/etc/my/config_directory', pyinotify.IN_CREATE | pyinotify.IN_DELETE)
notifier.loop()
其中 connectMQTT
在 Connection
全局中存储新连接和配置的 MQTT 客户端。