如何正确终止阻塞的线程(Lparallel Common Lisp)
How to Properly Terminate a Thread which is Blocking (Lparallel Common Lisp)
在 Lparallel API 中,终止所有线程任务的推荐方法是使用 (lparallel:end-kernel)
停止内核。但是当一个线程阻塞时——例如,(pop-queue queue1)
等待一个项目出现在队列中——当内核停止时它仍然是活动的。在这种情况下(至少在 SBCL 中)内核关闭偶尔(但不是每次)都会失败:
debugger invoked on a SB-KERNEL:BOUNDING-INDICES-BAD-ERROR in thread
#<THREAD "lparallel" RUNNING {1002F04973}>:
The bounding indices 1 and NIL are bad for a sequence of length 0.
See also:
The ANSI Standard, Glossary entry for "bounding index designator"
The ANSI Standard, writeup for Issue SUBSEQ-OUT-OF-BOUNDS:IS-AN-ERROR
debugger invoked on a SB-SYS:INTERACTIVE-INTERRUPT in thread
#<THREAD "main thread" RUNNING {10012E0613}>:
Interactive interrupt at #x1001484328.
我假设这与阻塞线程未正确终止有关。在关闭内核之前应该如何正确终止阻塞线程? (API 说 kill-tasks
只应在特殊情况下使用,我认为不适用于这种“正常”关机情况。)
终止线程的问题是它可能发生在任何地方,当线程可能处于任何未知状态时。
安全终止线程的唯一方法是让它优雅地自行关闭,这意味着您希望在正常操作期间,线程有一种方法知道它应该停止工作。然后你可以正确清理你的资源,关闭数据库,释放外部指针,记录所有事情,...
您正在使用的队列有可能会超时的操作,这是一种简单而安全的方法来确保您可以避免永远阻塞并正确退出。但这不是唯一的选择(除了下面显示的内容之外,您还可以使用它们)。
共享/全局标志
发生超时或收到消息时,您检查全局布尔变量(或在所有感兴趣的线程之间共享的变量)。那也是一种简单的退出方式,而且可以被多线程读取。然而,这是并发访问,因此您应该使用锁或原子操作 (http://www.sbcl.org/manual/#Atomic-Operations),例如使用 defglobal
和带有 atomic-incf
的 fixnum 类型,等等
控制消息
在队列中发送控制数据并使用它们来确定如何正常关闭、如何在管道中传播信息或如何重新启动。这是安全的(只是消息传递)并且允许您在线程中实现任何类型的控制。
(defpackage :so (:use :cl :bt :lparallel.queue))
(in-package :so)
让我们定义两个服务。
第一个回显它的输入:
(defun echo (in out)
(lambda ()
(loop
for value = (pop-queue in)
do (push-queue value out)
until (eq value :stop))))
注意在给定 :stop
输入时它是如何正确完成的,以及它如何将 :stop
消息传播到它的输出队列。
第二个线程将执行模块化加法,并在请求之间休眠:
(defun modulo-adder (x m in out)
(lambda ()
(loop
for value = (progn (sleep 0.02)
(pop-queue in))
do (push-queue (typecase value
(keyword value)
(number (mod (+ x value) m)))
out)
until (eq value :stop))))
创建队列:
(defparameter *q1* (make-queue))
(defparameter *q2* (make-queue))
创建线程:
(progn
(bt:make-thread (echo *q1* *q2*) :name "echo")
(bt:make-thread (modulo-adder 5 1024 *q2* *q1*) :name "adder"))
两个线程以循环方式相互连接,创建一个无限的加法循环。当前线程之间没有交换任何值,您可以看到它们 运行 例如 slime-list-threads
或任何其他实现提供的方式;无论如何(bt:all-threads)
returns一个列表。
slime-list-threads
10 adder Running
11 echo Running
...
添加一个项目,现在线程之间可以无限交换数据:
(push-queue 10 *q1*)
等等,然后阻止他们:
(push-queue :stop *q1*)
两个线程均正常停止(它们在线程列表中不再可见)。
我们可以检查队列中剩余的内容(结果因测试而异):
(list (try-pop-queue *q1*)
(try-pop-queue *q2*))
(99 NIL)
(list (try-pop-queue *q1*)
(try-pop-queue *q2*))
(:STOP NIL)
(list (try-pop-queue *q1*)
(try-pop-queue *q2*))
(NIL NIL)
中断线程
您创建了一个由消息或全局标志控制的服务,但随后出现错误,线程挂起。与其杀死它并丢失所有内容,不如至少适当地展开线程堆栈。这也很危险,但是您可以使用 bt:interrupt
立即停止线程 在 运行 的任何位置 并执行函数。
(define-condition stop () ())
(defun signal-stop ()
(signal 'stop))
(defun endless ()
(let ((output *standard-output*))
(lambda ()
(print "START" output)
(unwind-protect (handler-case (loop)
(stop ()
(print "INTERRUPTED" output)))
(print "STOP" output)))))
开始吧:
(bt:make-thread (endless) :name "loop")
这会打印 "START"
并循环。
然后我们打断它:
(bt:interrupt-thread (find "loop"
(bt:all-threads)
:test #'string=
:key #'bt:thread-name)
#'signal-stop)
打印如下:
"INTERRUPTED"
"STOP"
如果线程被终止,这些消息将不会被打印出来,但请注意,考虑到中断的随机性,您仍然可以设法获得损坏的数据。此外,它还可以解除阻塞调用,例如 sleep
或 pop-queue
.
在 Lparallel API 中,终止所有线程任务的推荐方法是使用 (lparallel:end-kernel)
停止内核。但是当一个线程阻塞时——例如,(pop-queue queue1)
等待一个项目出现在队列中——当内核停止时它仍然是活动的。在这种情况下(至少在 SBCL 中)内核关闭偶尔(但不是每次)都会失败:
debugger invoked on a SB-KERNEL:BOUNDING-INDICES-BAD-ERROR in thread
#<THREAD "lparallel" RUNNING {1002F04973}>:
The bounding indices 1 and NIL are bad for a sequence of length 0.
See also:
The ANSI Standard, Glossary entry for "bounding index designator"
The ANSI Standard, writeup for Issue SUBSEQ-OUT-OF-BOUNDS:IS-AN-ERROR
debugger invoked on a SB-SYS:INTERACTIVE-INTERRUPT in thread
#<THREAD "main thread" RUNNING {10012E0613}>:
Interactive interrupt at #x1001484328.
我假设这与阻塞线程未正确终止有关。在关闭内核之前应该如何正确终止阻塞线程? (API 说 kill-tasks
只应在特殊情况下使用,我认为不适用于这种“正常”关机情况。)
终止线程的问题是它可能发生在任何地方,当线程可能处于任何未知状态时。 安全终止线程的唯一方法是让它优雅地自行关闭,这意味着您希望在正常操作期间,线程有一种方法知道它应该停止工作。然后你可以正确清理你的资源,关闭数据库,释放外部指针,记录所有事情,...
您正在使用的队列有可能会超时的操作,这是一种简单而安全的方法来确保您可以避免永远阻塞并正确退出。但这不是唯一的选择(除了下面显示的内容之外,您还可以使用它们)。
共享/全局标志
发生超时或收到消息时,您检查全局布尔变量(或在所有感兴趣的线程之间共享的变量)。那也是一种简单的退出方式,而且可以被多线程读取。然而,这是并发访问,因此您应该使用锁或原子操作 (http://www.sbcl.org/manual/#Atomic-Operations),例如使用 defglobal
和带有 atomic-incf
的 fixnum 类型,等等
控制消息
在队列中发送控制数据并使用它们来确定如何正常关闭、如何在管道中传播信息或如何重新启动。这是安全的(只是消息传递)并且允许您在线程中实现任何类型的控制。
(defpackage :so (:use :cl :bt :lparallel.queue))
(in-package :so)
让我们定义两个服务。
第一个回显它的输入:
(defun echo (in out)
(lambda ()
(loop
for value = (pop-queue in)
do (push-queue value out)
until (eq value :stop))))
注意在给定 :stop
输入时它是如何正确完成的,以及它如何将 :stop
消息传播到它的输出队列。
第二个线程将执行模块化加法,并在请求之间休眠:
(defun modulo-adder (x m in out)
(lambda ()
(loop
for value = (progn (sleep 0.02)
(pop-queue in))
do (push-queue (typecase value
(keyword value)
(number (mod (+ x value) m)))
out)
until (eq value :stop))))
创建队列:
(defparameter *q1* (make-queue))
(defparameter *q2* (make-queue))
创建线程:
(progn
(bt:make-thread (echo *q1* *q2*) :name "echo")
(bt:make-thread (modulo-adder 5 1024 *q2* *q1*) :name "adder"))
两个线程以循环方式相互连接,创建一个无限的加法循环。当前线程之间没有交换任何值,您可以看到它们 运行 例如 slime-list-threads
或任何其他实现提供的方式;无论如何(bt:all-threads)
returns一个列表。
slime-list-threads
10 adder Running
11 echo Running
...
添加一个项目,现在线程之间可以无限交换数据:
(push-queue 10 *q1*)
等等,然后阻止他们:
(push-queue :stop *q1*)
两个线程均正常停止(它们在线程列表中不再可见)。 我们可以检查队列中剩余的内容(结果因测试而异):
(list (try-pop-queue *q1*)
(try-pop-queue *q2*))
(99 NIL)
(list (try-pop-queue *q1*)
(try-pop-queue *q2*))
(:STOP NIL)
(list (try-pop-queue *q1*)
(try-pop-queue *q2*))
(NIL NIL)
中断线程
您创建了一个由消息或全局标志控制的服务,但随后出现错误,线程挂起。与其杀死它并丢失所有内容,不如至少适当地展开线程堆栈。这也很危险,但是您可以使用 bt:interrupt
立即停止线程 在 运行 的任何位置 并执行函数。
(define-condition stop () ())
(defun signal-stop ()
(signal 'stop))
(defun endless ()
(let ((output *standard-output*))
(lambda ()
(print "START" output)
(unwind-protect (handler-case (loop)
(stop ()
(print "INTERRUPTED" output)))
(print "STOP" output)))))
开始吧:
(bt:make-thread (endless) :name "loop")
这会打印 "START"
并循环。
然后我们打断它:
(bt:interrupt-thread (find "loop"
(bt:all-threads)
:test #'string=
:key #'bt:thread-name)
#'signal-stop)
打印如下:
"INTERRUPTED"
"STOP"
如果线程被终止,这些消息将不会被打印出来,但请注意,考虑到中断的随机性,您仍然可以设法获得损坏的数据。此外,它还可以解除阻塞调用,例如 sleep
或 pop-queue
.