使用 vwait 的 Tcl 线程产生随机结果
Tcl Thread using vwait produces random result
我一直在尝试在Tcl 中实现一些多线程问题。在某些时候,我想在 Java 中模拟 Fork/Join 框架,其中一个线程生成两个“子”线程,每个线程完成一半的工作。为了在 Tcl 中执行此操作,您将使用启动脚本创建一些线程,让它们工作然后等待它们的结果。我的实现使用可连接线程,异步消息发送带有可选结果变量和该变量的 vwait
,但产生不规则结果。有时脚本会按预期完成,有时它会一直阻塞在 vwait
上。为什么有时会阻塞在vwait
,否则运行就好了?我不知道为什么会这样;据我所知,我遵循了手册页。
一些示例代码来演示该行为:
package require Thread
set code {
proc run {} {
puts "I am running"
return 4
}
thread::wait
}
set t1 [ thread::create -joinable $code ]
set t2 [ thread::create -joinable $code ]
thread::send -async $t1 "run" res1
thread::send -async $t2 "run" res2
puts "Waiting"
vwait res1
vwait res2
puts "Releasing"
thread::release $t1
thread::release $t2
puts "Joining"
thread::join $t1
thread::join $t2
puts [ expr { $res1 + $res2 } ]
这有时会产生(如预期):
Waiting
I am running
I am running
Releasing
Joining
8
...有时:
Waiting
I am running
I am running
<keeps hanging here>
仅产生 1 个线程似乎永远不会重现问题,或者可能是潜在问题仍然存在但从未表现出来。希望有人能阐明为什么会发生这种情况。提前致谢!
您关于竞争条件的想法(线程 运行 按照第二个先执行的顺序排列)是正确的。
一种替代方法使用 shared variables 而不是让 run
proc return 一个值,并使用条件变量通知父线程 run
过程在之前完成杀死线程:
#!/usr/bin/env tclsh
package require Thread
set code {
proc run {m cond res} {
puts "$res is running"
tsv::incr results $res 4
thread::mutex lock $m
thread::cond notify $cond
thread::mutex unlock $m
}
thread::wait
}
set t1 [thread::create -joinable $code]
set t2 [thread::create -joinable $code]
set c1 [thread::cond create]
set c2 [thread::cond create]
set m1 [thread::mutex create]
set m2 [thread::mutex create]
thread::send -async $t1 [list run $m1 $c1 res1]
thread::send -async $t2 [list run $m2 $c2 res2]
puts "Running threads"
thread::mutex lock $m1
while {![tsv::exists results res1]} {
thread::cond wait $c1 $m1
}
thread::mutex unlock $m1
thread::mutex lock $m2
while {![tsv::exists results res2]} {
thread::cond wait $c2 $m2
}
thread::mutex unlock $m2
puts "Releasing"
thread::release $t1
thread::release $t2
puts "Joining" ;# Not sure if this is needed after the release or even valid
thread::join $t1
thread::join $t2
puts [expr {[tsv::get results res1] + [tsv::get results res2]}]
线程可以 运行 任意顺序,但 vwait
只等待一个变量; $t2
有可能在 $t1
完成(并设置 res1
之前)完成其工作(并设置 res2
)。解决这个问题的一个简单方法是将要等待的变量放在一个数组中,然后 vwait
放在整个数组中:
thread::send -async $t1 "run" waiting(res1)
thread::send -async $t2 "run" waiting(res2)
puts "Waiting"
# Wait for the two sets to happen, in either order
vwait waiting
vwait waiting
puts "Releasing"
更一般地说,考虑改用线程池。
package require Thread
set code {
proc run {} {
puts "I am running"
return 4
}
# No thread::wait here!
}
set pool [tpool::create -maxworkers 2 -initcmd $code]
set task1 [tpool::post $pool run]
set task2 [tpool::post $pool run]
tpool::wait $pool $task1
tpool::wait $pool $task2
非常感谢到目前为止的好主意!我能够解决我自己的问题,因为当您仅用几行代码解释问题时,就更容易监督正在发生的事情。我也想分享我的想法。查看接受的答案以更好地理解问题。
我想出了一个替代解决方案,在让线程处理异步消息后使用同步消息发送。 Tcl手册规定:
Many threads can simultaneously send scripts to the target thread for execution. All of them are entered into the event queue of the target thread and executed on the FIFO basis [...].
... 这意味着发送到线程的脚本按照它们到达的顺序进行处理。您可以让线程接受一个新脚本,该脚本只是 returns 结果值,并同步发送该脚本以阻塞,直到您获得结果。这样,您根本不必使用 vwait
,因为您依赖于 FIFO 脚本发送系统。
解决方案代码如下所示:
package require Thread
set code {
set result 0
proc run {} {
upvar result result
puts "I am running"
set result 4
}
proc getResult {} {
upvar result result
return $result
}
thread::wait
}
set t1 [ thread::create -joinable $code ]
set t2 [ thread::create -joinable $code ]
thread::send -async $t1 "run" res1
thread::send -async $t2 "run" res2
puts "Waiting"
set res1 [ thread::send $t1 "getResult" ]
set res2 [ thread::send $t2 "getResult" ]
puts "Releasing"
thread::release $t1
thread::release $t2
puts "Joining"
thread::join $t1
thread::join $t2
puts [ expr { $res1 + $res2 } ]
这是添加线程间通信的第三个选项的示例,使用通道管道 (chan pipe
) 作为通信工具(而不是互斥锁/条件,[=12] 也在内部使用它们=],如果我没记错的话):
package require Thread
set code {
proc run {ch} {
puts "[thread::id] is running"
puts $ch 4
close $ch
}
thread::wait
}
set handler [list {pr tid} {
variable busy
variable data
set r [read $pr]
if {[eof $pr]} {
close $pr
unset busy($tid)
thread::release $tid
} else {
lappend data $r
}
} [namespace current]]
set t1 [thread::create $code]
lassign [chan pipe] pr1 pw1
chan configure $pw1 -blocking 0
chan configure $pr1 -blocking 0
thread::transfer $t1 $pw1
chan event $pr1 readable [list apply $handler $pr1 $t1]
thread::send -async $t1 [list run $pw1]
set busy($t1) $pr1
set t2 [thread::create $code]
lassign [chan pipe] pr2 pw2
chan configure $pw2 -blocking 0
chan configure $pr2 -blocking 0
thread::transfer $t2 $pw2
chan event $pr2 readable [list apply $handler $pr2 $t2]
thread::send -async $t2 [list run $pw2]
set busy($t2) $pr2
while {[array size busy]} {
vwait ::busy
}
puts [tcl::mathop::+ {*}$data]
一些备注:
- 管道之所以优雅,是因为线程间通信遵循通道通信的模型(
chan event
, puts
/ read
);
- 它们可用于工作线程(不仅仅是主线程和工作线程)之间的通信;
- 上面的实现并不理想,因为不应在每个作业的基础上创建和销毁线程。因此,显然 tpool 使用起来更方便,更适合给定的情况;
- 管道也可以用作线程之间的辅助通信线路(在 tpool 中);
- 您必须引入一些过程抽象来隐藏基于管道的解决方案的冗长;
我一直在尝试在Tcl 中实现一些多线程问题。在某些时候,我想在 Java 中模拟 Fork/Join 框架,其中一个线程生成两个“子”线程,每个线程完成一半的工作。为了在 Tcl 中执行此操作,您将使用启动脚本创建一些线程,让它们工作然后等待它们的结果。我的实现使用可连接线程,异步消息发送带有可选结果变量和该变量的 vwait
,但产生不规则结果。有时脚本会按预期完成,有时它会一直阻塞在 vwait
上。为什么有时会阻塞在vwait
,否则运行就好了?我不知道为什么会这样;据我所知,我遵循了手册页。
一些示例代码来演示该行为:
package require Thread
set code {
proc run {} {
puts "I am running"
return 4
}
thread::wait
}
set t1 [ thread::create -joinable $code ]
set t2 [ thread::create -joinable $code ]
thread::send -async $t1 "run" res1
thread::send -async $t2 "run" res2
puts "Waiting"
vwait res1
vwait res2
puts "Releasing"
thread::release $t1
thread::release $t2
puts "Joining"
thread::join $t1
thread::join $t2
puts [ expr { $res1 + $res2 } ]
这有时会产生(如预期):
Waiting
I am running
I am running
Releasing
Joining
8
...有时:
Waiting
I am running
I am running
<keeps hanging here>
仅产生 1 个线程似乎永远不会重现问题,或者可能是潜在问题仍然存在但从未表现出来。希望有人能阐明为什么会发生这种情况。提前致谢!
您关于竞争条件的想法(线程 运行 按照第二个先执行的顺序排列)是正确的。
一种替代方法使用 shared variables 而不是让 run
proc return 一个值,并使用条件变量通知父线程 run
过程在之前完成杀死线程:
#!/usr/bin/env tclsh
package require Thread
set code {
proc run {m cond res} {
puts "$res is running"
tsv::incr results $res 4
thread::mutex lock $m
thread::cond notify $cond
thread::mutex unlock $m
}
thread::wait
}
set t1 [thread::create -joinable $code]
set t2 [thread::create -joinable $code]
set c1 [thread::cond create]
set c2 [thread::cond create]
set m1 [thread::mutex create]
set m2 [thread::mutex create]
thread::send -async $t1 [list run $m1 $c1 res1]
thread::send -async $t2 [list run $m2 $c2 res2]
puts "Running threads"
thread::mutex lock $m1
while {![tsv::exists results res1]} {
thread::cond wait $c1 $m1
}
thread::mutex unlock $m1
thread::mutex lock $m2
while {![tsv::exists results res2]} {
thread::cond wait $c2 $m2
}
thread::mutex unlock $m2
puts "Releasing"
thread::release $t1
thread::release $t2
puts "Joining" ;# Not sure if this is needed after the release or even valid
thread::join $t1
thread::join $t2
puts [expr {[tsv::get results res1] + [tsv::get results res2]}]
线程可以 运行 任意顺序,但 vwait
只等待一个变量; $t2
有可能在 $t1
完成(并设置 res1
之前)完成其工作(并设置 res2
)。解决这个问题的一个简单方法是将要等待的变量放在一个数组中,然后 vwait
放在整个数组中:
thread::send -async $t1 "run" waiting(res1)
thread::send -async $t2 "run" waiting(res2)
puts "Waiting"
# Wait for the two sets to happen, in either order
vwait waiting
vwait waiting
puts "Releasing"
更一般地说,考虑改用线程池。
package require Thread
set code {
proc run {} {
puts "I am running"
return 4
}
# No thread::wait here!
}
set pool [tpool::create -maxworkers 2 -initcmd $code]
set task1 [tpool::post $pool run]
set task2 [tpool::post $pool run]
tpool::wait $pool $task1
tpool::wait $pool $task2
非常感谢到目前为止的好主意!我能够解决我自己的问题,因为当您仅用几行代码解释问题时,就更容易监督正在发生的事情。我也想分享我的想法。查看接受的答案以更好地理解问题。 我想出了一个替代解决方案,在让线程处理异步消息后使用同步消息发送。 Tcl手册规定:
Many threads can simultaneously send scripts to the target thread for execution. All of them are entered into the event queue of the target thread and executed on the FIFO basis [...].
... 这意味着发送到线程的脚本按照它们到达的顺序进行处理。您可以让线程接受一个新脚本,该脚本只是 returns 结果值,并同步发送该脚本以阻塞,直到您获得结果。这样,您根本不必使用 vwait
,因为您依赖于 FIFO 脚本发送系统。
解决方案代码如下所示:
package require Thread
set code {
set result 0
proc run {} {
upvar result result
puts "I am running"
set result 4
}
proc getResult {} {
upvar result result
return $result
}
thread::wait
}
set t1 [ thread::create -joinable $code ]
set t2 [ thread::create -joinable $code ]
thread::send -async $t1 "run" res1
thread::send -async $t2 "run" res2
puts "Waiting"
set res1 [ thread::send $t1 "getResult" ]
set res2 [ thread::send $t2 "getResult" ]
puts "Releasing"
thread::release $t1
thread::release $t2
puts "Joining"
thread::join $t1
thread::join $t2
puts [ expr { $res1 + $res2 } ]
这是添加线程间通信的第三个选项的示例,使用通道管道 (chan pipe
) 作为通信工具(而不是互斥锁/条件,[=12] 也在内部使用它们=],如果我没记错的话):
package require Thread
set code {
proc run {ch} {
puts "[thread::id] is running"
puts $ch 4
close $ch
}
thread::wait
}
set handler [list {pr tid} {
variable busy
variable data
set r [read $pr]
if {[eof $pr]} {
close $pr
unset busy($tid)
thread::release $tid
} else {
lappend data $r
}
} [namespace current]]
set t1 [thread::create $code]
lassign [chan pipe] pr1 pw1
chan configure $pw1 -blocking 0
chan configure $pr1 -blocking 0
thread::transfer $t1 $pw1
chan event $pr1 readable [list apply $handler $pr1 $t1]
thread::send -async $t1 [list run $pw1]
set busy($t1) $pr1
set t2 [thread::create $code]
lassign [chan pipe] pr2 pw2
chan configure $pw2 -blocking 0
chan configure $pr2 -blocking 0
thread::transfer $t2 $pw2
chan event $pr2 readable [list apply $handler $pr2 $t2]
thread::send -async $t2 [list run $pw2]
set busy($t2) $pr2
while {[array size busy]} {
vwait ::busy
}
puts [tcl::mathop::+ {*}$data]
一些备注:
- 管道之所以优雅,是因为线程间通信遵循通道通信的模型(
chan event
,puts
/read
); - 它们可用于工作线程(不仅仅是主线程和工作线程)之间的通信;
- 上面的实现并不理想,因为不应在每个作业的基础上创建和销毁线程。因此,显然 tpool 使用起来更方便,更适合给定的情况;
- 管道也可以用作线程之间的辅助通信线路(在 tpool 中);
- 您必须引入一些过程抽象来隐藏基于管道的解决方案的冗长;