如何注册一个 SIGINT 处理程序,一旦按下 Ctrl+C,它就会 运行?
How to register a SIGINT handler that will run as soon as Ctrl+C is pressed?
我正在编写一个 Python 脚本,该脚本使用 AppKit 的 PyObjC 绑定。该脚本向共享 NSWorkspace
's notificationCenter
注册一个观察者,然后调用 AppKit.CFRunLoopRun() 以便处理通知:
from __future__ import print_function
from AppKit import *
import signal
shared_workspace = NSWorkspace.sharedWorkspace()
def on_did_activate_application(notification):
print('on_did_activate_application(notification = %s)' % notification)
notification_center = shared_workspace.notificationCenter()
did_activate_application_observer = notification_center.addObserverForName_object_queue_usingBlock_(
NSWorkspaceDidActivateApplicationNotification,
None,
None,
on_did_activate_application)
def handle_interrupt(signum, frame):
notification_center.removeObserver_(did_activate_application_observer)
CFRunLoopStop(CFRunLoopGetCurrent())
signal.signal(signal.SIGINT, handle_interrupt)
CFRunLoopRun()
我遇到的问题(可通过上述 MCVE 重现)是,当我在终端 window 运行 上按 Ctrl+C 时,脚本 handle_interrupt() 是不会立即执行,而是在下一次处理 NSWorkspaceDidActivateApplicationNotification
通知时执行。
如何在 Ctrl+C / SIGINT 出现时立即响应?
这可以通过将 "signal wakeup fd" 设置为管道的写入端,然后创建一个 CFFileDescriptor
来监视管道的读取端 activity 来实现。
如 What’s New in Python 2.6, in Python 2.6, a new API was added to the signal
module called set_wakeup_fd() 所述。每当接收到信号时,都会将一个 NUL 字节 ('[=13=]'
) 写入 fd。
如果wakeup fd设置为管道的写入端,那么可以创建一个CFFileDescriptor
来监控管道读取端的activity(数据可用性), activity 上的回调可以配置为 CFRunLoop
上的 运行。
from __future__ import print_function
from AppKit import * # For development only. This takes a long time to complete as there are many symbols.
import fcntl
import os
import signal
shared_workspace = NSWorkspace.sharedWorkspace()
def on_did_activate_application(notification):
print('on_did_activate_application(notification = %s)' % notification)
notification_center = shared_workspace.notificationCenter()
did_activate_application_observer = notification_center.addObserverForName_object_queue_usingBlock_(
NSWorkspaceDidActivateApplicationNotification,
None,
None,
on_did_activate_application)
def handle_signal(signum, frame):
print('handle_signal(signum = %s, frame = <scrubbed>)' % signum)
if signum == signal.SIGCONT:
signal.signal(signal.SIGTSTP, handle_signal)
elif signum == signal.SIGINT:
notification_center.removeObserver_(did_activate_application_observer)
CFRunLoopStop(CFRunLoopGetCurrent())
else:
#
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum)
r, w = os.pipe()
flags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
fcntl.fcntl(r, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def callout(f, call_back_types, info):
# Note: The signal handler will run first.
print('callout()')
# Clear the pipe of NUL bytes.
n = 0
while True:
try:
n += len(os.read(r, 100))
except OSError:
break
print('read %d byte(s)' % n)
# Per https://developer.apple.com/documentation/corefoundation/cffiledescriptor?language=objc
# "Each call back is one-shot, and must be re-enabled if you want to get another one."
# Thus we need to re-enable call backs.
CFFileDescriptorEnableCallBacks(f, kCFFileDescriptorReadCallBack)
file_descriptor = CFFileDescriptorCreate(None, r, True, callout, None)
CFFileDescriptorEnableCallBacks(file_descriptor, kCFFileDescriptorReadCallBack)
run_loop_source = CFFileDescriptorCreateRunLoopSource(None, file_descriptor, 0)
CFRunLoopAddSource(CFRunLoopGetCurrent(), run_loop_source, kCFRunLoopDefaultMode)
signal.set_wakeup_fd(w)
signal.signal(signal.SIGCONT, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTSTP, handle_signal)
# For testing, configure a SIGALRM to be received every two seconds.
signal.signal(signal.SIGALRM, lambda _1, _2: print('SIGALRM'))
signal.setitimer(signal.ITIMER_REAL, 2, 2)
print('about to call CFRunLoopRun()')
CFRunLoopRun()
致谢
非常感谢 'A GUEST'(无论您是谁)在 Stack Overflow 上发帖 this paste on Pastebin; to Da_Blitz for writing the Fighting set_wakeup_fd article; and to the askers/answerers of “Proper” way to handle signals other than SIGINT in Python?, What's the difference between SIGSTOP and SIGTSTP?, and 。
我正在编写一个 Python 脚本,该脚本使用 AppKit 的 PyObjC 绑定。该脚本向共享 NSWorkspace
's notificationCenter
注册一个观察者,然后调用 AppKit.CFRunLoopRun() 以便处理通知:
from __future__ import print_function
from AppKit import *
import signal
shared_workspace = NSWorkspace.sharedWorkspace()
def on_did_activate_application(notification):
print('on_did_activate_application(notification = %s)' % notification)
notification_center = shared_workspace.notificationCenter()
did_activate_application_observer = notification_center.addObserverForName_object_queue_usingBlock_(
NSWorkspaceDidActivateApplicationNotification,
None,
None,
on_did_activate_application)
def handle_interrupt(signum, frame):
notification_center.removeObserver_(did_activate_application_observer)
CFRunLoopStop(CFRunLoopGetCurrent())
signal.signal(signal.SIGINT, handle_interrupt)
CFRunLoopRun()
我遇到的问题(可通过上述 MCVE 重现)是,当我在终端 window 运行 上按 Ctrl+C 时,脚本 handle_interrupt() 是不会立即执行,而是在下一次处理 NSWorkspaceDidActivateApplicationNotification
通知时执行。
如何在 Ctrl+C / SIGINT 出现时立即响应?
这可以通过将 "signal wakeup fd" 设置为管道的写入端,然后创建一个 CFFileDescriptor
来监视管道的读取端 activity 来实现。
如 What’s New in Python 2.6, in Python 2.6, a new API was added to the signal
module called set_wakeup_fd() 所述。每当接收到信号时,都会将一个 NUL 字节 ('[=13=]'
) 写入 fd。
如果wakeup fd设置为管道的写入端,那么可以创建一个CFFileDescriptor
来监控管道读取端的activity(数据可用性), activity 上的回调可以配置为 CFRunLoop
上的 运行。
from __future__ import print_function
from AppKit import * # For development only. This takes a long time to complete as there are many symbols.
import fcntl
import os
import signal
shared_workspace = NSWorkspace.sharedWorkspace()
def on_did_activate_application(notification):
print('on_did_activate_application(notification = %s)' % notification)
notification_center = shared_workspace.notificationCenter()
did_activate_application_observer = notification_center.addObserverForName_object_queue_usingBlock_(
NSWorkspaceDidActivateApplicationNotification,
None,
None,
on_did_activate_application)
def handle_signal(signum, frame):
print('handle_signal(signum = %s, frame = <scrubbed>)' % signum)
if signum == signal.SIGCONT:
signal.signal(signal.SIGTSTP, handle_signal)
elif signum == signal.SIGINT:
notification_center.removeObserver_(did_activate_application_observer)
CFRunLoopStop(CFRunLoopGetCurrent())
else:
#
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum)
r, w = os.pipe()
flags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
fcntl.fcntl(r, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def callout(f, call_back_types, info):
# Note: The signal handler will run first.
print('callout()')
# Clear the pipe of NUL bytes.
n = 0
while True:
try:
n += len(os.read(r, 100))
except OSError:
break
print('read %d byte(s)' % n)
# Per https://developer.apple.com/documentation/corefoundation/cffiledescriptor?language=objc
# "Each call back is one-shot, and must be re-enabled if you want to get another one."
# Thus we need to re-enable call backs.
CFFileDescriptorEnableCallBacks(f, kCFFileDescriptorReadCallBack)
file_descriptor = CFFileDescriptorCreate(None, r, True, callout, None)
CFFileDescriptorEnableCallBacks(file_descriptor, kCFFileDescriptorReadCallBack)
run_loop_source = CFFileDescriptorCreateRunLoopSource(None, file_descriptor, 0)
CFRunLoopAddSource(CFRunLoopGetCurrent(), run_loop_source, kCFRunLoopDefaultMode)
signal.set_wakeup_fd(w)
signal.signal(signal.SIGCONT, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTSTP, handle_signal)
# For testing, configure a SIGALRM to be received every two seconds.
signal.signal(signal.SIGALRM, lambda _1, _2: print('SIGALRM'))
signal.setitimer(signal.ITIMER_REAL, 2, 2)
print('about to call CFRunLoopRun()')
CFRunLoopRun()
致谢
非常感谢 'A GUEST'(无论您是谁)在 Stack Overflow 上发帖 this paste on Pastebin; to Da_Blitz for writing the Fighting set_wakeup_fd article; and to the askers/answerers of “Proper” way to handle signals other than SIGINT in Python?, What's the difference between SIGSTOP and SIGTSTP?, and