如何注册一个 SIGINT 处理程序,一旦按下 Ctrl+C,它就会 运行?

How to register a SIGINT handler that will run as soon as Ctrl+C is pressed?

我正在编写一个 Python 脚本,该脚本使用 AppKit 的 PyObjC 绑定。该脚本向共享 NSWorkspace's notificationCenter 注册一个观察者,然后调用 AppKit.CFRunLoopRun() 以便处理通知:

from __future__ import print_function
from AppKit import *
import signal

shared_workspace = NSWorkspace.sharedWorkspace()

def on_did_activate_application(notification):
    print('on_did_activate_application(notification = %s)' % notification)

notification_center = shared_workspace.notificationCenter()
did_activate_application_observer = notification_center.addObserverForName_object_queue_usingBlock_(
        NSWorkspaceDidActivateApplicationNotification,
        None,
        None,
        on_did_activate_application)

def handle_interrupt(signum, frame):
    notification_center.removeObserver_(did_activate_application_observer)
    CFRunLoopStop(CFRunLoopGetCurrent())

signal.signal(signal.SIGINT, handle_interrupt)

CFRunLoopRun()

我遇到的问题(可通过上述 MCVE 重现)是,当我在终端 window 运行 上按 Ctrl+C 时,脚本 handle_interrupt() 是不会立即执行,而是在下一次处理 NSWorkspaceDidActivateApplicationNotification 通知时执行。

如何在 Ctrl+C / SIGINT 出现时立即响应?

这可以通过将 "signal wakeup fd" 设置为管道的写入端,然后创建一个 CFFileDescriptor 来监视管道的读取端 activity 来实现。

What’s New in Python 2.6, in Python 2.6, a new API was added to the signal module called set_wakeup_fd() 所述。每当接收到信号时,都会将一个 NUL 字节 ('[=13=]') 写入 fd。

如果wakeup fd设置为管道的写入端,那么可以创建一个CFFileDescriptor来监控管道读取端的activity(数据可用性), activity 上的回调可以配置为 CFRunLoop 上的 运行。

from __future__ import print_function
from AppKit import * # For development only. This takes a long time to complete as there are many symbols.
import fcntl
import os
import signal

shared_workspace = NSWorkspace.sharedWorkspace()

def on_did_activate_application(notification):
    print('on_did_activate_application(notification = %s)' % notification)

notification_center = shared_workspace.notificationCenter()
did_activate_application_observer = notification_center.addObserverForName_object_queue_usingBlock_(
        NSWorkspaceDidActivateApplicationNotification,
        None,
        None,
        on_did_activate_application)

def handle_signal(signum, frame):
    print('handle_signal(signum = %s, frame = <scrubbed>)' % signum)
    if signum == signal.SIGCONT:
        signal.signal(signal.SIGTSTP, handle_signal)
    elif signum == signal.SIGINT:
        notification_center.removeObserver_(did_activate_application_observer)
        CFRunLoopStop(CFRunLoopGetCurrent())
    else:
        # 
        signal.signal(signum, signal.SIG_DFL)
        os.kill(os.getpid(), signum)

r, w = os.pipe()

flags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
fcntl.fcntl(r, fcntl.F_SETFL, flags | os.O_NONBLOCK)

def callout(f, call_back_types, info):
    # Note: The signal handler will run first.

    print('callout()')

    # Clear the pipe of NUL bytes.
    n = 0
    while True:
        try:
            n += len(os.read(r, 100))
        except OSError:
            break
    print('read %d byte(s)' % n)

    # Per https://developer.apple.com/documentation/corefoundation/cffiledescriptor?language=objc
    # "Each call back is one-shot, and must be re-enabled if you want to get another one."
    # Thus we need to re-enable call backs.
    CFFileDescriptorEnableCallBacks(f, kCFFileDescriptorReadCallBack)

file_descriptor = CFFileDescriptorCreate(None, r, True, callout, None)
CFFileDescriptorEnableCallBacks(file_descriptor, kCFFileDescriptorReadCallBack)
run_loop_source = CFFileDescriptorCreateRunLoopSource(None, file_descriptor, 0)
CFRunLoopAddSource(CFRunLoopGetCurrent(), run_loop_source, kCFRunLoopDefaultMode)

signal.set_wakeup_fd(w)
signal.signal(signal.SIGCONT, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTSTP, handle_signal)

# For testing, configure a SIGALRM to be received every two seconds.
signal.signal(signal.SIGALRM, lambda _1, _2: print('SIGALRM'))
signal.setitimer(signal.ITIMER_REAL, 2, 2)

print('about to call CFRunLoopRun()')
CFRunLoopRun()
致谢

非常感谢 'A GUEST'(无论您是谁)在 Stack Overflow 上发帖 this paste on Pastebin; to Da_Blitz for writing the Fighting set_wakeup_fd article; and to the askers/answerers of “Proper” way to handle signals other than SIGINT in Python?, What's the difference between SIGSTOP and SIGTSTP?, and