如何为两个单独的 Dbus Python 程序创建 Dbus Mainloop
How to create for Dbus Mainloop for two separate Dbus Python programs
我有一个 python 进程监视 Dbus 服务 (NetworkManager) 并与之交互
目前这个运行在它自己的主程序线程中
import dbus.mainloop.glib
import NetworkManager
PYTHON3 = sys.version_info >= (3, 0)
if PYTHON3:
from gi.repository import GObject as gobject
from gi.repository import GLib as glib
else:
import gobject
class NetworkController(threading.Thread):
def __init__(self, run_daemon_mode=False):
# Set up DBus loop
self.loop = None
dbus.mainloop.glib.threads_init()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
if PYTHON3:
self.loop = glib.MainLoop()
else:
self.loop = gobject.MainLoop()
gobject.threads_init()
#Create the thread
super(NetworkController, self).__init__(daemon=run_daemon_mode)
def run(self):
"""
Method to run the DBus main loop (on a thread)
"""
# NetworkManager init stuff...
logger.debug("Starting Network Controller loop.")
self.loop.run()
logger.debug("Network Controller loop has exited.")
def main(argv):
args=cli(argv)
#create network controller object but don't start the thread
network_con = network_controller.NetworkController(True)
network_con.start() #
try:
while True:
我现在需要添加一个线程来控制同样使用 Dbus 的蓝牙 GATT 服务(pyBluez 实现)。我如何构建我的代码以让每个进程 运行 在它自己的线程中并使用 Dbus。
谢谢!
我创建了新的 class,它使用 Dbus 并且是一个单独的线程,与我创建第一个 class 的方式相同。然后从我的主例程中调用 class 构造函数并启动每个线程。
class AnotherGlibMainloop(threading.Thread):
def __init__(self, run_daemon_mode=False):
# Set up DBus loop
self.loop = None
dbus.mainloop.glib.threads_init()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
if PYTHON3:
self.loop = glib.MainLoop()
else:
self.loop = gobject.MainLoop()
gobject.threads_init()
#Create the thread
super(NetworkController, self).__init__(daemon=run_daemon_mode)
def run(self):
"""
Method to run the DBus main loop (on a thread)
"""
# NetworkManager init stuff...
logger.debug("Starting Another Glib Mainloop.")
self.loop.run()
logger.debug("Another Glib Mainloop has exited.")
def main(argv): args=cli(argv)
#create network controller object but don't start the thread
network_con = network_controller.NetworkController(True)
network_con.start() #
another_glib_mainloop = AnotherGlibMainloop(True)
another_glib_mainloop.start()
try:
while True:
我有一个 python 进程监视 Dbus 服务 (NetworkManager) 并与之交互
目前这个运行在它自己的主程序线程中
import dbus.mainloop.glib
import NetworkManager
PYTHON3 = sys.version_info >= (3, 0)
if PYTHON3:
from gi.repository import GObject as gobject
from gi.repository import GLib as glib
else:
import gobject
class NetworkController(threading.Thread):
def __init__(self, run_daemon_mode=False):
# Set up DBus loop
self.loop = None
dbus.mainloop.glib.threads_init()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
if PYTHON3:
self.loop = glib.MainLoop()
else:
self.loop = gobject.MainLoop()
gobject.threads_init()
#Create the thread
super(NetworkController, self).__init__(daemon=run_daemon_mode)
def run(self):
"""
Method to run the DBus main loop (on a thread)
"""
# NetworkManager init stuff...
logger.debug("Starting Network Controller loop.")
self.loop.run()
logger.debug("Network Controller loop has exited.")
def main(argv):
args=cli(argv)
#create network controller object but don't start the thread
network_con = network_controller.NetworkController(True)
network_con.start() #
try:
while True:
我现在需要添加一个线程来控制同样使用 Dbus 的蓝牙 GATT 服务(pyBluez 实现)。我如何构建我的代码以让每个进程 运行 在它自己的线程中并使用 Dbus。
谢谢!
我创建了新的 class,它使用 Dbus 并且是一个单独的线程,与我创建第一个 class 的方式相同。然后从我的主例程中调用 class 构造函数并启动每个线程。
class AnotherGlibMainloop(threading.Thread):
def __init__(self, run_daemon_mode=False):
# Set up DBus loop
self.loop = None
dbus.mainloop.glib.threads_init()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
if PYTHON3:
self.loop = glib.MainLoop()
else:
self.loop = gobject.MainLoop()
gobject.threads_init()
#Create the thread
super(NetworkController, self).__init__(daemon=run_daemon_mode)
def run(self):
"""
Method to run the DBus main loop (on a thread)
"""
# NetworkManager init stuff...
logger.debug("Starting Another Glib Mainloop.")
self.loop.run()
logger.debug("Another Glib Mainloop has exited.")
def main(argv): args=cli(argv)
#create network controller object but don't start the thread
network_con = network_controller.NetworkController(True)
network_con.start() #
another_glib_mainloop = AnotherGlibMainloop(True)
another_glib_mainloop.start()
try:
while True: