在 linux 上将 HID 访问与 evdev 集成在一起 Python twisted
Integrating HID access with evdev on linux with Python twisted
在 linux 机器上(Debian wheezy)我正在尝试编写一个基于事件的服务器来执行以下操作:
获取输入设备(特殊键盘)的独占输入,防止击键进入通常的事件链。
报名参加扭曲反应堆中的活动
在等待事件返回的延迟注册回调。此回调将在收到特殊键序列后发送 HTTP 请求。
这是 pyevdev
包中的示例代码。我得到通知并相应地接收击键是有效的。
通过查看 read_loop()
命令的源代码,它也使用了类似于 twisted 的 select
语句。
我的问题
如何将此代码集成到 python Twisted 中?一个想法是查看底层字符设备 /dev/input/event0
并以非阻塞方式从中读取。如果 if 是一个常规文件,我会使用类似于 inotify 的东西,但在这种情况下我不知道。
evdev 包中的示例代码
from evdev import InputDevice, categorize, ecodes, list_devices
devices = [InputDevice(fn) for fn in list_devices()]
for dev in devices:
print(dev.fn, dev.name, dev.phys)
dev = InputDevice('/dev/input/event0')
# get exclusive access to input device
dev.grab()
for event in dev.read_loop():
if event.type == ecodes.EV_KEY:
print categorize(event)
evdev.device.InputDevice
has a fileno()
method , which means that you can hook it up to a Twisted IReactorFDSet
; pretty much all reactors available on Linux where evdev
is relevant implement this interface. Since an event device is an object with a file descriptor that you can mostly just read from, you need an IReadDescriptor
把它包起来。
与您的示例大致相同的逻辑的实现,但使用反应堆来处理事件,可能如下所示:
from zope.interface import implementer
from twisted.internet.interfaces import IReadDescriptor
from twisted.logger import Logger
log = Logger()
@implementer(IReadDescriptor)
class InputDescriptor(object):
def __init__(self, reactor, inputDevice, eventReceiver):
self._reactor = reactor
self._dev = inputDevice
self._receiver = eventReceiver
def fileno(self):
return self._dev.fileno()
def logPrefix(self):
return "Input Device: " + repr(self._dev)
def doRead(self):
evt = self._dev.read_one()
try:
self._receiver.eventReceived(evt)
except:
log.failure("while dispatching HID event")
def connectionLost(self, reason):
self.stop()
self._receiver.connectionLost(reason)
def start(self):
self._dev.grab()
self._reactor.addReader(self)
def stop(self):
self._reactor.removeReader(self)
self._dev.ungrab()
from evdev import InputDevice, categorize, ecodes, list_devices
devices = [InputDevice(fn) for fn in list_devices()]
for dev in devices:
print(dev.fn, dev.name, dev.phys)
dev = InputDevice('/dev/input/event0')
class KeyReceiver(object):
def eventReceived(self, event):
if event.type == ecodes.EV_KEY:
print(categorize(event))
def connectionLost(self, reason):
print("Event device lost!!", reason)
from twisted.internet import reactor
InputDescriptor(reactor, dev, KeyReceiver()).start()
reactor.run()
请注意,此代码完全未经测试,因此一开始它可能无法正常工作,但它至少应该让您了解需要什么。
在 linux 机器上(Debian wheezy)我正在尝试编写一个基于事件的服务器来执行以下操作:
获取输入设备(特殊键盘)的独占输入,防止击键进入通常的事件链。
报名参加扭曲反应堆中的活动
在等待事件返回的延迟注册回调。此回调将在收到特殊键序列后发送 HTTP 请求。
这是 pyevdev
包中的示例代码。我得到通知并相应地接收击键是有效的。
通过查看 read_loop()
命令的源代码,它也使用了类似于 twisted 的 select
语句。
我的问题
如何将此代码集成到 python Twisted 中?一个想法是查看底层字符设备 /dev/input/event0
并以非阻塞方式从中读取。如果 if 是一个常规文件,我会使用类似于 inotify 的东西,但在这种情况下我不知道。
evdev 包中的示例代码
from evdev import InputDevice, categorize, ecodes, list_devices
devices = [InputDevice(fn) for fn in list_devices()]
for dev in devices:
print(dev.fn, dev.name, dev.phys)
dev = InputDevice('/dev/input/event0')
# get exclusive access to input device
dev.grab()
for event in dev.read_loop():
if event.type == ecodes.EV_KEY:
print categorize(event)
evdev.device.InputDevice
has a fileno()
method , which means that you can hook it up to a Twisted IReactorFDSet
; pretty much all reactors available on Linux where evdev
is relevant implement this interface. Since an event device is an object with a file descriptor that you can mostly just read from, you need an IReadDescriptor
把它包起来。
与您的示例大致相同的逻辑的实现,但使用反应堆来处理事件,可能如下所示:
from zope.interface import implementer
from twisted.internet.interfaces import IReadDescriptor
from twisted.logger import Logger
log = Logger()
@implementer(IReadDescriptor)
class InputDescriptor(object):
def __init__(self, reactor, inputDevice, eventReceiver):
self._reactor = reactor
self._dev = inputDevice
self._receiver = eventReceiver
def fileno(self):
return self._dev.fileno()
def logPrefix(self):
return "Input Device: " + repr(self._dev)
def doRead(self):
evt = self._dev.read_one()
try:
self._receiver.eventReceived(evt)
except:
log.failure("while dispatching HID event")
def connectionLost(self, reason):
self.stop()
self._receiver.connectionLost(reason)
def start(self):
self._dev.grab()
self._reactor.addReader(self)
def stop(self):
self._reactor.removeReader(self)
self._dev.ungrab()
from evdev import InputDevice, categorize, ecodes, list_devices
devices = [InputDevice(fn) for fn in list_devices()]
for dev in devices:
print(dev.fn, dev.name, dev.phys)
dev = InputDevice('/dev/input/event0')
class KeyReceiver(object):
def eventReceived(self, event):
if event.type == ecodes.EV_KEY:
print(categorize(event))
def connectionLost(self, reason):
print("Event device lost!!", reason)
from twisted.internet import reactor
InputDescriptor(reactor, dev, KeyReceiver()).start()
reactor.run()
请注意,此代码完全未经测试,因此一开始它可能无法正常工作,但它至少应该让您了解需要什么。