Nim 线程间消息传递:如何避免全局 TChannel?
Nim inter-thread message passing: How to avoid a global TChannel?
我有以下线程间通信问题的简单示例:我想在后台线程中 运行 任意 "anytime" 算法。 anytime 算法增量地执行一些结果类型 T
的计算,即,它偶尔会产生更新、更精确的结果。用 Nim 的说法,它们可能最好用迭代器来表示。在主线程中,我现在想将这样的迭代器分别包装在自己的线程中,并有可能在线程中查询 "is there a new value available" 或 "what is the current computation result" 之类的东西。
由于我不熟悉 Nim 的并发概念,因此无法实现所需的线程间通信。我的想法是使用 TChannel
进行通信。根据this forum post,TChannel
不能与spawn
组合使用,但需要使用createThread
。我设法编译了以下内容 运行:
import os, threadpool
proc spawnBackgroundJob[T](f: iterator (): T): TChannel[T] =
type Args = tuple[iter: iterator (): T, channel: ptr TChannel[T]]
# I think I have to wrap the iterator to pass it to createThread
proc threadFunc(args: Args) {.thread.} =
echo "Thread is starting"
let iter = args.iter
var channel = args.channel[]
for i in iter():
echo "Sending ", i
channel.send(i)
var thread: TThread[Args]
var channel: TChannel[T]
channel.open()
let args = (f, channel.addr)
createThread(thread, threadFunc, args)
result = channel
# example use in some main thread:
iterator test(): int {.closure.} =
sleep(500)
yield 1
sleep(500)
yield 2
var channel = spawnBackgroundJob[int](test)
for i in 0 .. 10:
sleep(200)
echo channel.peek()
echo "Finished"
不幸的是,这没有预期的行为,即我从未在主线程中收到任何东西。我在 IRC 上被告知问题是我没有使用全局变量。但即使经过很长时间的思考,我也不知道为什么会失败,也不知道是否有办法解决它。问题是我不能简单地将变量 thread
和 channel
设为全局变量,因为它们取决于类型 T
。我还想避免将其限制为仅 运行 单个随时算法(或其他一些固定数字 N)。我还被告知该方法总体上没有真正意义,所以也许我只是想念这个问题有一个完全不同的解决方案?
原因:
您在发送和接收中使用了两个不同的通道。
Nim 中的对象分配是深拷贝,它们是不同的对象。
var channel = args.channel[]
和
result = channel
为了解释它,请看下面的代码片段:
type
A = object
x: int
y: int
var a,b: A
var c = cast[ptr A](allocShared0(sizeof(A))) # shared memory allocation
a = c[]
b = c[]
echo a.x, a.y, b.x, b.y, c.x, c.y # output: 000000
a.x = 1
a.y = 2
echo a.x, a.y, b.x, b.y, c.x, c.y # output: 120000
b.x = 3
b.y = 4
echo a.x, a.y, b.x, b.y, c.x, c.y # output: 123400
通道进出proc的解决方案:
要将 Channel 作为参数和 return 值传递,请参考 Jehan 在 Nim forum 中的回答。
在这里粘贴 Jehan 的答案以供快速参考,并使其在 Nim 0.11.2 中通过编译
type SharedChannel[T] = ptr TChannel[T]
proc newSharedChannel[T](): SharedChannel[T] =
result = cast[SharedChannel[T]](allocShared0(sizeof(TChannel[T])))
open(result[])
proc close[T](ch: var SharedChannel[T]) =
close(ch[])
deallocShared(ch)
ch = nil
proc send[T](ch: SharedChannel[T], content: T) =
ch[].send(content)
proc recv[T](ch: SharedChannel[T]): T =
result = ch[].recv
proc someThread(ch: (SharedChannel[string], SharedChannel[bool])) {.thread.} =
let (mainChannel, responseChannel) = ch
while true:
let s = mainChannel.recv
if s == nil:
break
echo s
responseChannel.send(true)
responseChannel.send(false)
proc main() =
var
mainChannel = newSharedChannel[string]()
responseChannel = newSharedChannel[bool]()
th: TThread[(SharedChannel[string], SharedChannel[bool])]
createThread(th, someThread, (mainChannel, responseChannel))
for i in 0..2:
echo("main thread send: " & $i)
mainChannel.send($i)
if not responseChannel.recv:
break
mainChannel.send(nil)
joinThread(th)
close(mainChannel)
close(responseChannel)
main()
输出:
main thread send: 0
0
main thread send: 1
1
main thread send: 2
2
再一步,本题解答:
import os, threadpool, macros
template spawnBackgroundJob(t: typedesc, chan:ptr TChannel[t], iter: expr): stmt {.immediate.}=
block:
proc threadFunc(channel: ptr TChannel[t]) {.thread.} =
echo "Thread is starting"
for i in iter:
echo "Sending ", i
channel[].send(i)
channel[].open()
var thread: TThread[ptr TChannel[t]]
createThread(thread, threadFunc, chan)
#joinThread(thread)
# example use in some main thread:
iterator testJob(): int =
yield 0
sleep(500)
yield 1
sleep(500)
yield 2
var channel: ptr TChannel[int]
channel = cast[ptr TChannel[int]](allocShared0(sizeof(TChannel[int])))
spawnBackgroundJob(type(int), channel, testJob())
for i in 1 .. 10:
sleep(200)
echo channel[].peek()
channel[].close()
我有以下线程间通信问题的简单示例:我想在后台线程中 运行 任意 "anytime" 算法。 anytime 算法增量地执行一些结果类型 T
的计算,即,它偶尔会产生更新、更精确的结果。用 Nim 的说法,它们可能最好用迭代器来表示。在主线程中,我现在想将这样的迭代器分别包装在自己的线程中,并有可能在线程中查询 "is there a new value available" 或 "what is the current computation result" 之类的东西。
由于我不熟悉 Nim 的并发概念,因此无法实现所需的线程间通信。我的想法是使用 TChannel
进行通信。根据this forum post,TChannel
不能与spawn
组合使用,但需要使用createThread
。我设法编译了以下内容 运行:
import os, threadpool
proc spawnBackgroundJob[T](f: iterator (): T): TChannel[T] =
type Args = tuple[iter: iterator (): T, channel: ptr TChannel[T]]
# I think I have to wrap the iterator to pass it to createThread
proc threadFunc(args: Args) {.thread.} =
echo "Thread is starting"
let iter = args.iter
var channel = args.channel[]
for i in iter():
echo "Sending ", i
channel.send(i)
var thread: TThread[Args]
var channel: TChannel[T]
channel.open()
let args = (f, channel.addr)
createThread(thread, threadFunc, args)
result = channel
# example use in some main thread:
iterator test(): int {.closure.} =
sleep(500)
yield 1
sleep(500)
yield 2
var channel = spawnBackgroundJob[int](test)
for i in 0 .. 10:
sleep(200)
echo channel.peek()
echo "Finished"
不幸的是,这没有预期的行为,即我从未在主线程中收到任何东西。我在 IRC 上被告知问题是我没有使用全局变量。但即使经过很长时间的思考,我也不知道为什么会失败,也不知道是否有办法解决它。问题是我不能简单地将变量 thread
和 channel
设为全局变量,因为它们取决于类型 T
。我还想避免将其限制为仅 运行 单个随时算法(或其他一些固定数字 N)。我还被告知该方法总体上没有真正意义,所以也许我只是想念这个问题有一个完全不同的解决方案?
原因:
您在发送和接收中使用了两个不同的通道。
Nim 中的对象分配是深拷贝,它们是不同的对象。
var channel = args.channel[]
和
result = channel
为了解释它,请看下面的代码片段:
type
A = object
x: int
y: int
var a,b: A
var c = cast[ptr A](allocShared0(sizeof(A))) # shared memory allocation
a = c[]
b = c[]
echo a.x, a.y, b.x, b.y, c.x, c.y # output: 000000
a.x = 1
a.y = 2
echo a.x, a.y, b.x, b.y, c.x, c.y # output: 120000
b.x = 3
b.y = 4
echo a.x, a.y, b.x, b.y, c.x, c.y # output: 123400
通道进出proc的解决方案:
要将 Channel 作为参数和 return 值传递,请参考 Jehan 在 Nim forum 中的回答。
在这里粘贴 Jehan 的答案以供快速参考,并使其在 Nim 0.11.2 中通过编译
type SharedChannel[T] = ptr TChannel[T]
proc newSharedChannel[T](): SharedChannel[T] =
result = cast[SharedChannel[T]](allocShared0(sizeof(TChannel[T])))
open(result[])
proc close[T](ch: var SharedChannel[T]) =
close(ch[])
deallocShared(ch)
ch = nil
proc send[T](ch: SharedChannel[T], content: T) =
ch[].send(content)
proc recv[T](ch: SharedChannel[T]): T =
result = ch[].recv
proc someThread(ch: (SharedChannel[string], SharedChannel[bool])) {.thread.} =
let (mainChannel, responseChannel) = ch
while true:
let s = mainChannel.recv
if s == nil:
break
echo s
responseChannel.send(true)
responseChannel.send(false)
proc main() =
var
mainChannel = newSharedChannel[string]()
responseChannel = newSharedChannel[bool]()
th: TThread[(SharedChannel[string], SharedChannel[bool])]
createThread(th, someThread, (mainChannel, responseChannel))
for i in 0..2:
echo("main thread send: " & $i)
mainChannel.send($i)
if not responseChannel.recv:
break
mainChannel.send(nil)
joinThread(th)
close(mainChannel)
close(responseChannel)
main()
输出:
main thread send: 0
0
main thread send: 1
1
main thread send: 2
2
再一步,本题解答:
import os, threadpool, macros
template spawnBackgroundJob(t: typedesc, chan:ptr TChannel[t], iter: expr): stmt {.immediate.}=
block:
proc threadFunc(channel: ptr TChannel[t]) {.thread.} =
echo "Thread is starting"
for i in iter:
echo "Sending ", i
channel[].send(i)
channel[].open()
var thread: TThread[ptr TChannel[t]]
createThread(thread, threadFunc, chan)
#joinThread(thread)
# example use in some main thread:
iterator testJob(): int =
yield 0
sleep(500)
yield 1
sleep(500)
yield 2
var channel: ptr TChannel[int]
channel = cast[ptr TChannel[int]](allocShared0(sizeof(TChannel[int])))
spawnBackgroundJob(type(int), channel, testJob())
for i in 1 .. 10:
sleep(200)
echo channel[].peek()
channel[].close()