如何优雅地杀死正在监听消息队列的线程

How to kill threads that are listening to message queue elegantly

在我的 Python 应用程序中,我有一个函数可以使用来自 Amazon SQS FIFO 队列的消息。

def consume_msgs():
    sqs = boto3.client('sqs',
                   region_name='us-east-1',
                   aws_access_key_id=AWS_ACCESS_KEY_ID,
                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
    print('STARTING WORKER listening on {}'.format(QUEUE_URL))
    while 1:
        response = sqs.receive_message(
            QueueUrl=QUEUE_URL,
            MaxNumberOfMessages=1,
            WaitTimeSeconds=10,
        )
        messages = response.get('Messages', [])
        for message in messages:
            try:
                print('{} > {}'.format(threading.currentThread().getName(), message.get('Body')))
                body = json.loads(message.get('Body'))
                sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=message.get('ReceiptHandle'))

            except Exception as e:
                print('Exception in worker > ', e)
                sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=message.get('ReceiptHandle'))

    time.sleep(10)

为了扩大规模,我正在使用多线程来处理消息。

if __name__ == '__main__:

    for i in range(3):
        t = threading.Thread(target=consume_msgs, name='worker-%s' % i)
        t.setDaemon(True)
        t.start()
    while True:
        print('Waiting')
        time.sleep(5)

应用程序作为服务运行。如果我需要部署新版本,则必须重新启动它。当主进程终止时,有没有办法让线程正常存在?它们不是突然杀死线程,而是先完成当前消息并停止接收下一条消息。

由于您的线程不断循环,您不能只 join 它们,但您需要向它们发出信号,也该是时候跳出循环了,以便能够做到这一点。这个 docs 提示可能有用:

Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event.

有了这个,我把下面的例子放在一起,希望能有所帮助:

from threading import Thread, Event
from time import sleep

def fce(ident, wrap_up_event):
    cnt = 0
    while True:
        print(f"{ident}: {cnt}", wrap_up_event.is_set())
        sleep(3)
        cnt += 1
        if wrap_up_event.is_set():
            break
    print(f"{ident}: Wrapped up")

if __name__ == '__main__':
    wanna_exit = Event()
    for i in range(3):
        t = Thread(target=fce, args=(i, wanna_exit))
        t.start()
    sleep(5)
    wanna_exit.set()

一个事件实例被传递给 fce,它只会无限地保持 运行,但是每次迭代完成后,在返回顶部之前检查事件是否已设置为True。在退出脚本之前,我们从控制线程将此事件设置为 True。由于线程不再标记为守护线程,我们不必显式 join 它们。

根据您想要关闭脚本的具体方式,您将需要处理传入信号(可能 SIGTERM)或 SIGINTKeyboardInterrupt 异常。并在退出前进行清理,其机制保持不变。除了不让 python 立即停止执行之外,您还需要让您的线程知道它们不应该重新进入循环并等待它们加入。


SIGINT 稍微简单一些,因为它作为 python 异常公开,您可以为 "main" 位执行此操作:

if __name__ == '__main__':
    wanna_exit = Event()
    for i in range(3):
        t = Thread(target=fce, args=(i, wanna_exit))
        t.start()
    try:
        while True:
            sleep(5)
            print('Waiting')
    except KeyboardInterrupt:
        pass
    wanna_exit.set()

您当然可以将 SIGINT 发送到具有 kill 的进程,而不仅仅是从控制终端。