从不同的线程读取值
Reading values from a different thread
我正在用 Go 编写软件来进行大量并行计算。我想从工作线程收集数据,但我不确定如何以安全的方式进行。我知道我可以使用通道,但在我的场景中它们使它变得更加复杂,因为我必须以某种方式在主线程中同步消息(等待每个线程发送一些东西)。
场景
主线程创建 n Worker
个实例并在 goroutine 中启动它们的 work()
方法,以便每个工作人员 运行 在他们的自己的线程。每隔 10 秒,主线程应该从 workers 收集一些简单的值(例如迭代计数)并打印一个综合统计信息。
问题
从工人那里读取值安全吗?主线程只会读取值,每个单独的线程都会写入自己的值。如果在读取时这些值有几纳秒的偏差就没问题。
关于如何以简单的方式实现它的任何其他想法?
在 Go 中,如果至少有一个访问是写操作,那么对于没有同步的多个 goroutine 的并发访问,没有任何值是安全的。您的案例满足列出的条件,因此您必须使用某种同步,否则行为将不确定。
如果 goroutine 想要将值发送给另一个,则使用通道。您的情况不完全是这样:您不希望您的工作人员每 10 秒发送一次更新,您希望您的主 goroutine 每 10 秒 fetch 状态。
所以在这个例子中我只是用sync.RWMutex
保护数据:当工作人员想要修改这个数据时,他们必须获得一个写锁。当主goroutine要读取这个数据时,它必须获得一个读锁。
一个简单的实现可能如下所示:
type Worker struct {
iterMu sync.RWMutex
iter int
}
func (w *Worker) Iter() int {
w.iterMu.RLock()
defer w.iterMu.RUnlock()
return w.iter
}
func (w *Worker) setIter(n int) {
w.iterMu.Lock()
w.iter = n
w.iterMu.Unlock()
}
func (w *Worker) incIter() {
w.iterMu.Lock()
w.iter++
w.iterMu.Unlock()
}
使用此示例 Worker
,主协程可以使用 Worker.Iter()
获取迭代,而 worker 本身可以使用 Worker.setIter()
或 Worker.incIter()
更改/更新迭代在任何时候,无需任何额外的同步。通过正确使用 Worker.iterMu
.
来确保同步
对于迭代计数器,您也可以使用 sync/atomic
包。如果你选择这个,你只能使用 atomic
包的函数读取/修改迭代计数器,如下所示:
type Worker struct {
iter int64
}
func (w *Worker) Iter() int64 {
return atomic.LoadInt64(&w.iter)
}
func (w *Worker) setIter(n int64) {
atomic.StoreInt64(&w.iter, n)
}
func (w *Worker) incIter() {
atomic.AddInt64(&w.iter, 1)
}
我正在用 Go 编写软件来进行大量并行计算。我想从工作线程收集数据,但我不确定如何以安全的方式进行。我知道我可以使用通道,但在我的场景中它们使它变得更加复杂,因为我必须以某种方式在主线程中同步消息(等待每个线程发送一些东西)。
场景
主线程创建 n Worker
个实例并在 goroutine 中启动它们的 work()
方法,以便每个工作人员 运行 在他们的自己的线程。每隔 10 秒,主线程应该从 workers 收集一些简单的值(例如迭代计数)并打印一个综合统计信息。
问题
从工人那里读取值安全吗?主线程只会读取值,每个单独的线程都会写入自己的值。如果在读取时这些值有几纳秒的偏差就没问题。
关于如何以简单的方式实现它的任何其他想法?
在 Go 中,如果至少有一个访问是写操作,那么对于没有同步的多个 goroutine 的并发访问,没有任何值是安全的。您的案例满足列出的条件,因此您必须使用某种同步,否则行为将不确定。
如果 goroutine 想要将值发送给另一个,则使用通道。您的情况不完全是这样:您不希望您的工作人员每 10 秒发送一次更新,您希望您的主 goroutine 每 10 秒 fetch 状态。
所以在这个例子中我只是用sync.RWMutex
保护数据:当工作人员想要修改这个数据时,他们必须获得一个写锁。当主goroutine要读取这个数据时,它必须获得一个读锁。
一个简单的实现可能如下所示:
type Worker struct {
iterMu sync.RWMutex
iter int
}
func (w *Worker) Iter() int {
w.iterMu.RLock()
defer w.iterMu.RUnlock()
return w.iter
}
func (w *Worker) setIter(n int) {
w.iterMu.Lock()
w.iter = n
w.iterMu.Unlock()
}
func (w *Worker) incIter() {
w.iterMu.Lock()
w.iter++
w.iterMu.Unlock()
}
使用此示例 Worker
,主协程可以使用 Worker.Iter()
获取迭代,而 worker 本身可以使用 Worker.setIter()
或 Worker.incIter()
更改/更新迭代在任何时候,无需任何额外的同步。通过正确使用 Worker.iterMu
.
对于迭代计数器,您也可以使用 sync/atomic
包。如果你选择这个,你只能使用 atomic
包的函数读取/修改迭代计数器,如下所示:
type Worker struct {
iter int64
}
func (w *Worker) Iter() int64 {
return atomic.LoadInt64(&w.iter)
}
func (w *Worker) setIter(n int64) {
atomic.StoreInt64(&w.iter, n)
}
func (w *Worker) incIter() {
atomic.AddInt64(&w.iter, 1)
}