在 Lparallel 库 (Common Lisp) 中使用队列
Using Queues in Lparallel Library (Common Lisp)
https://z0ltan.wordpress.com/2016/09/09/basic-concurrency-and-parallelism-in-common-lisp-part-4a-parallelism-using-lparallel-fundamentals/#channels 的 lparallel 库中关于队列的基本讨论说队列 "enable message passing between worker threads." 下面的测试使用共享队列来协调主线程和从线程,其中主线程只是等待下属退出前完成:
(defun foo (q)
(sleep 1)
(lparallel.queue:pop-queue q)) ;q is now empty
(defun test ()
(setf lparallel:*kernel* (lparallel:make-kernel 1))
(let ((c (lparallel:make-channel))
(q (lparallel.queue:make-queue)))
(lparallel.queue:push-queue 0 q)
(lparallel:submit-task c #'foo q)
(loop do (sleep .2)
(print (lparallel.queue:peek-queue q))
when (lparallel.queue:queue-empty-p q)
do (return)))
(lparallel:end-kernel :wait t))
这按预期产生输出:
* (test)
0
0
0
0
NIL
(#<SB-THREAD:THREAD "lparallel" FINISHED values: NIL {10068F2B03}>)
我的问题是关于我是否正确或完整地使用了 lparallel 的队列功能。队列似乎只是使用全局变量来保存线程共享对象的替代品。使用队列的设计优势是什么?为每个提交的任务分配一个队列通常是好的做法吗(假设任务需要通信)?感谢任何更深入的见解。
多线程工作是通过管理对可变对象的并发访问来完成的
共享状态,即你有一个公共数据结构的锁,
每个线程读取或写入它。
但是建议尽量减少正在处理的数据数量
同时访问。队列是一种将工作人员与每个工作人员解耦的方法
其他,通过让每个线程管理其本地状态并交换数据
仅通过消息;这是线程安全的,因为访问
队列由 锁和条件控制
变量.
你在主线程中做的是轮询队列
是空的;这可能有效,但适得其反,因为队列
被用作同步机制,但在这里你正在做
自己同步。
(ql:quickload :lparallel)
(defpackage :so (:use :cl
:lparallel
:lparallel.queue
:lparallel.kernel-util))
(in-package :so)
让我们更改 foo
使其获得两个队列,一个用于传入
请求,一个用于答复。在这里,我们执行一个简单的转换
发送的数据和每个输入消息,只有一个
输出消息,但情况并非总是如此。
(defun foo (in out)
(push-queue (1+ (pop-queue in)) out))
更改 test
以便控制流仅基于 reading/writing 到队列:
(defun test ()
(with-temp-kernel (1)
(let ((c (make-channel))
(foo-in (make-queue))
(foo-out (make-queue)))
(submit-task c #'foo foo-in foo-out)
;; submit data to task (could be blocking)
(push-queue 0 foo-in)
;; wait for message from task (could be blocking too)
(pop-queue foo-out))))
But how can you can avoid polling in test if there are multiple tasks running? Don’t you need to continuously check when any one of them is done so you can push-queue more work to it?
您可以使用不同的并发机制,类似于 listen and poll/epoll,您可以在其中监视多个
事件的来源,并在其中一个准备就绪时做出反应。有像 Go (select) and Erlang (receive) 这样的语言,其中
这表达起来很自然。在 Lisp 方面,Calispel 库提供了类似的交替机制(pri-alt
和 fair-alt
)。例如,以下它取自Calispel的测试代码:
(pri-alt ((? control msg)
(ecase msg
(:clean-up (setf cleanup? t))
(:high-speed (setf slow? nil))
(:low-speed (setf slow? t))))
((? channel msg)
(declare (type fixnum msg))
(vector-push-extend msg out))
((otherwise :timeout (if cleanup? 0 nil))
(! reader-results out)
(! thread-expiration (bt:current-thread))
(return)))
在 lparallel 的情况下,没有这样的机制,但如果您使用标识符标记消息,您可以只使用队列。
如果您需要在 任务 t1
或 t2
给出结果后立即做出反应,请将这两个任务写入相同的结果通道:
(let ((t1 (foo :id 1 :in i1 :out res))
(t2 (bar :id 2 :in i2 :out res)))
(destructuring-bind (id message) (pop-queue res)
(case id
(1 ...)
(2 ...))))
如果您需要在 t1
和 t2
发出结果时同步代码,让它们在不同的通道中写入:
(let ((t1 (foo :id 1 :in i1 :out o1))
(t2 (bar :id 2 :in i2 :out o2)))
(list (pop-queue o1)
(pop-queue o2)))
https://z0ltan.wordpress.com/2016/09/09/basic-concurrency-and-parallelism-in-common-lisp-part-4a-parallelism-using-lparallel-fundamentals/#channels 的 lparallel 库中关于队列的基本讨论说队列 "enable message passing between worker threads." 下面的测试使用共享队列来协调主线程和从线程,其中主线程只是等待下属退出前完成:
(defun foo (q)
(sleep 1)
(lparallel.queue:pop-queue q)) ;q is now empty
(defun test ()
(setf lparallel:*kernel* (lparallel:make-kernel 1))
(let ((c (lparallel:make-channel))
(q (lparallel.queue:make-queue)))
(lparallel.queue:push-queue 0 q)
(lparallel:submit-task c #'foo q)
(loop do (sleep .2)
(print (lparallel.queue:peek-queue q))
when (lparallel.queue:queue-empty-p q)
do (return)))
(lparallel:end-kernel :wait t))
这按预期产生输出:
* (test)
0
0
0
0
NIL
(#<SB-THREAD:THREAD "lparallel" FINISHED values: NIL {10068F2B03}>)
我的问题是关于我是否正确或完整地使用了 lparallel 的队列功能。队列似乎只是使用全局变量来保存线程共享对象的替代品。使用队列的设计优势是什么?为每个提交的任务分配一个队列通常是好的做法吗(假设任务需要通信)?感谢任何更深入的见解。
多线程工作是通过管理对可变对象的并发访问来完成的 共享状态,即你有一个公共数据结构的锁, 每个线程读取或写入它。
但是建议尽量减少正在处理的数据数量 同时访问。队列是一种将工作人员与每个工作人员解耦的方法 其他,通过让每个线程管理其本地状态并交换数据 仅通过消息;这是线程安全的,因为访问 队列由 锁和条件控制 变量.
你在主线程中做的是轮询队列 是空的;这可能有效,但适得其反,因为队列 被用作同步机制,但在这里你正在做 自己同步。
(ql:quickload :lparallel)
(defpackage :so (:use :cl
:lparallel
:lparallel.queue
:lparallel.kernel-util))
(in-package :so)
让我们更改 foo
使其获得两个队列,一个用于传入
请求,一个用于答复。在这里,我们执行一个简单的转换
发送的数据和每个输入消息,只有一个
输出消息,但情况并非总是如此。
(defun foo (in out)
(push-queue (1+ (pop-queue in)) out))
更改 test
以便控制流仅基于 reading/writing 到队列:
(defun test ()
(with-temp-kernel (1)
(let ((c (make-channel))
(foo-in (make-queue))
(foo-out (make-queue)))
(submit-task c #'foo foo-in foo-out)
;; submit data to task (could be blocking)
(push-queue 0 foo-in)
;; wait for message from task (could be blocking too)
(pop-queue foo-out))))
But how can you can avoid polling in test if there are multiple tasks running? Don’t you need to continuously check when any one of them is done so you can push-queue more work to it?
您可以使用不同的并发机制,类似于 listen and poll/epoll,您可以在其中监视多个
事件的来源,并在其中一个准备就绪时做出反应。有像 Go (select) and Erlang (receive) 这样的语言,其中
这表达起来很自然。在 Lisp 方面,Calispel 库提供了类似的交替机制(pri-alt
和 fair-alt
)。例如,以下它取自Calispel的测试代码:
(pri-alt ((? control msg)
(ecase msg
(:clean-up (setf cleanup? t))
(:high-speed (setf slow? nil))
(:low-speed (setf slow? t))))
((? channel msg)
(declare (type fixnum msg))
(vector-push-extend msg out))
((otherwise :timeout (if cleanup? 0 nil))
(! reader-results out)
(! thread-expiration (bt:current-thread))
(return)))
在 lparallel 的情况下,没有这样的机制,但如果您使用标识符标记消息,您可以只使用队列。
如果您需要在 任务 t1
或 t2
给出结果后立即做出反应,请将这两个任务写入相同的结果通道:
(let ((t1 (foo :id 1 :in i1 :out res))
(t2 (bar :id 2 :in i2 :out res)))
(destructuring-bind (id message) (pop-queue res)
(case id
(1 ...)
(2 ...))))
如果您需要在 t1
和 t2
发出结果时同步代码,让它们在不同的通道中写入:
(let ((t1 (foo :id 1 :in i1 :out o1))
(t2 (bar :id 2 :in i2 :out o2)))
(list (pop-queue o1)
(pop-queue o2)))