如何实现 Go 并发映射或切片以更快地管理正在使用的资源?
How to implement Go concurrent map or slice for managing in-use resources faster?
Image 你有一个结构,它表示一次只有一个用户可以访问的资源。可能看起来像这样:
type Resource struct{
InUse bool//or int32/int64 is you want to use atomics
Resource string //parameters that map to the resource, think `/dev/chardeviceXXX`
}
这些资源的数量有限,用户会随机并发地请求访问它们,因此您将它们打包在管理器中
type ResourceManager struct{
Resources []*Resource //or a map
}
我正在尝试为经理找出最佳、安全的方法来创建一个函数 func (m *ResourceManager)GetUnusedResouce()(*Resouce,error)
,它将:
- 遍历所有资源,直到找到一个不是 InUse 的资源
- 将其标记为 InUse 并且 return 调用的 *Resouce context/goroutine
- 我会锁定以避免任何系统级锁定 (flock) 并在 Go 中完成这一切
- 还需要一个标记资源不再使用的功能
现在我在管理器中使用互斥体来锁定访问,因为我遍历整个切片。这是安全的,但我希望通过能够同时搜索已用资源并处理两个试图将相同资源标记为 InUse 的 goroutine 来加快速度。
更新
我特别想知道如果将资源 InUse
字段设置为 int64
然后使用 atomic.CompareAndSwapInt64
是否允许资源管理器在发现未使用的资源时立即锁定:
func (m *ResourceManager)GetUnusedResouce()(*Resouce,error){
for i := range Resources{
if atomic.CompareAndSwapInt64(&Resouces[i].InUse,1){
return Resouces[i],nil
}
}
return nil, errors.New("all resouces in use")
}
任何单元测试以更好地测试这一点也将不胜感激。
给定资源是否正在使用不是 Resource
本身的 属性,而是 ResourceManager
.
的 属性
事实上,没有必要跟踪正在使用的资源(除非您出于某种问题中未提及的原因需要跟踪)。正在使用的资源在释放时可以简单地放回池中。
这是一个使用通道的可能实现。不需要单个互斥锁,也不需要任何原子 CAS。
package main
import (
fmt "fmt"
"time"
)
type Resource struct {
Data string
}
type ResourceManager struct {
resources []*Resource
closeCh chan struct{}
acquireCh chan *Resource
releaseCh chan *Resource
}
func NewResourceManager() *ResourceManager {
r := &ResourceManager{
closeCh: make(chan struct{}),
acquireCh: make(chan *Resource),
releaseCh: make(chan *Resource),
}
go r.run()
return r
}
func (r *ResourceManager) run() {
defer close(r.acquireCh)
for {
if len(r.resources) > 0 {
select {
case r.acquireCh <- r.resources[len(r.resources)-1]:
r.resources = r.resources[:len(r.resources)-1]
case res := <-r.releaseCh:
r.resources = append(r.resources, res)
case <-r.closeCh:
return
}
} else {
select {
case res := <-r.releaseCh:
r.resources = append(r.resources, res)
case <-r.closeCh:
return
}
}
}
}
func (r *ResourceManager) AcquireResource() *Resource {
return <-r.acquireCh
}
func (r *ResourceManager) ReleaseResource(res *Resource) {
r.releaseCh <- res
}
func (r *ResourceManager) Close() {
close(r.closeCh)
}
// small demo below ...
func test(id int, r *ResourceManager) {
for {
res := r.AcquireResource()
fmt.Printf("test %d: %s\n", id, res.Data)
time.Sleep(time.Millisecond)
r.ReleaseResource(res)
}
}
func main() {
r := NewResourceManager()
r.ReleaseResource(&Resource{"Resource A"}) // initial setup
r.ReleaseResource(&Resource{"Resource B"}) // initial setup
go test(1, r)
go test(2, r)
go test(3, r) // 3 consumers, but only 2 resources ...
time.Sleep(time.Second)
r.Close()
}
问题中的GetUnusedResouce
函数可能会对所有资源执行比较和交换操作。根据资源数量和应用程序访问模式,执行少量受互斥锁保护的操作会更快。
使用单向链表实现快速的get和put操作。
type Resource struct {
next *Resource
Resource string
}
type ResourceManager struct {
free *Resource
mu sync.Mutex
}
// Get gets a free resource from the manager or returns
// nil when the manager is empty.
func (m *ResourceManager) Get() *Resource {
m.mu.Lock()
defer m.mu.Unlock()
result := m.free
if m.free != nil {
m.free = m.free.next
}
return result
}
// Put returns a resource to the pool.
func (m *ResourceManager) Put(r *Resource) {
m.mu.Lock()
defer m.mu.Unlock()
r.next = m.free
m.free = r
}
这是在测试中使用的示例:
func TestResourceManager(t *testing.T) {
// Add free resources to a manager.
var m ResourceManager
m.Put(&Resource{Resource: "/dev/a"})
m.Put(&Resource{Resource: "/dev/b"})
// Test that we can get all resources from the pool.
ra := m.Get()
rb := m.Get()
if ra.Resource > rb.Resource {
// Sort ra, rb to make test independent of order.
ra, rb = rb, ra
}
if ra == nil || ra.Resource != "/dev/a" {
t.Errorf("ra is %v, want /dev/a", ra)
}
if rb == nil || rb.Resource != "/dev/b" {
t.Errorf("rb is %v, want /dev/b", rb)
}
// Check for empty pool.
r := m.Get()
if r != nil {
t.Errorf("r is %v, want nil", r)
}
// Return one resource and try again.
m.Put(ra)
ra = m.Get()
if ra == nil || ra.Resource != "/dev/a" {
t.Errorf("ra is %v, want /dev/a", ra)
}
r = m.Get()
if r != nil {
t.Errorf("r is %v, want nil", r)
}
}
Run the test on the playground.
如果资源数量已知且合理,则使用通道。这种方法利用了运行时高度优化的通道实现。
type Resource struct {
Resource string
}
type ResourceManager struct {
free chan *Resource
}
// Get gets a free resource from the manager or returns
// nil when the manager is empty.
func (m *ResourceManager) Get() *Resource {
select {
case r := <-m.free:
return r
default:
return nil
}
}
// Put returns a resource to the pool.
func (m *ResourceManager) Put(r *Resource) {
m.free <- r
}
// NewResourceManager returns a manager that can hold up to
// n free resources.
func NewResourceManager(n int) *ResourceManager {
return &ResourceManager{free: make(chan *Resource, n)}
}
使用上面的 TestResourceManager
函数测试此实现,但将 var m ResourceManager
替换为 m := NewResourceManager(4)
。
Image 你有一个结构,它表示一次只有一个用户可以访问的资源。可能看起来像这样:
type Resource struct{
InUse bool//or int32/int64 is you want to use atomics
Resource string //parameters that map to the resource, think `/dev/chardeviceXXX`
}
这些资源的数量有限,用户会随机并发地请求访问它们,因此您将它们打包在管理器中
type ResourceManager struct{
Resources []*Resource //or a map
}
我正在尝试为经理找出最佳、安全的方法来创建一个函数 func (m *ResourceManager)GetUnusedResouce()(*Resouce,error)
,它将:
- 遍历所有资源,直到找到一个不是 InUse 的资源
- 将其标记为 InUse 并且 return 调用的 *Resouce context/goroutine
- 我会锁定以避免任何系统级锁定 (flock) 并在 Go 中完成这一切
- 还需要一个标记资源不再使用的功能
现在我在管理器中使用互斥体来锁定访问,因为我遍历整个切片。这是安全的,但我希望通过能够同时搜索已用资源并处理两个试图将相同资源标记为 InUse 的 goroutine 来加快速度。
更新
我特别想知道如果将资源 InUse
字段设置为 int64
然后使用 atomic.CompareAndSwapInt64
是否允许资源管理器在发现未使用的资源时立即锁定:
func (m *ResourceManager)GetUnusedResouce()(*Resouce,error){
for i := range Resources{
if atomic.CompareAndSwapInt64(&Resouces[i].InUse,1){
return Resouces[i],nil
}
}
return nil, errors.New("all resouces in use")
}
任何单元测试以更好地测试这一点也将不胜感激。
给定资源是否正在使用不是 Resource
本身的 属性,而是 ResourceManager
.
事实上,没有必要跟踪正在使用的资源(除非您出于某种问题中未提及的原因需要跟踪)。正在使用的资源在释放时可以简单地放回池中。
这是一个使用通道的可能实现。不需要单个互斥锁,也不需要任何原子 CAS。
package main
import (
fmt "fmt"
"time"
)
type Resource struct {
Data string
}
type ResourceManager struct {
resources []*Resource
closeCh chan struct{}
acquireCh chan *Resource
releaseCh chan *Resource
}
func NewResourceManager() *ResourceManager {
r := &ResourceManager{
closeCh: make(chan struct{}),
acquireCh: make(chan *Resource),
releaseCh: make(chan *Resource),
}
go r.run()
return r
}
func (r *ResourceManager) run() {
defer close(r.acquireCh)
for {
if len(r.resources) > 0 {
select {
case r.acquireCh <- r.resources[len(r.resources)-1]:
r.resources = r.resources[:len(r.resources)-1]
case res := <-r.releaseCh:
r.resources = append(r.resources, res)
case <-r.closeCh:
return
}
} else {
select {
case res := <-r.releaseCh:
r.resources = append(r.resources, res)
case <-r.closeCh:
return
}
}
}
}
func (r *ResourceManager) AcquireResource() *Resource {
return <-r.acquireCh
}
func (r *ResourceManager) ReleaseResource(res *Resource) {
r.releaseCh <- res
}
func (r *ResourceManager) Close() {
close(r.closeCh)
}
// small demo below ...
func test(id int, r *ResourceManager) {
for {
res := r.AcquireResource()
fmt.Printf("test %d: %s\n", id, res.Data)
time.Sleep(time.Millisecond)
r.ReleaseResource(res)
}
}
func main() {
r := NewResourceManager()
r.ReleaseResource(&Resource{"Resource A"}) // initial setup
r.ReleaseResource(&Resource{"Resource B"}) // initial setup
go test(1, r)
go test(2, r)
go test(3, r) // 3 consumers, but only 2 resources ...
time.Sleep(time.Second)
r.Close()
}
问题中的GetUnusedResouce
函数可能会对所有资源执行比较和交换操作。根据资源数量和应用程序访问模式,执行少量受互斥锁保护的操作会更快。
使用单向链表实现快速的get和put操作。
type Resource struct {
next *Resource
Resource string
}
type ResourceManager struct {
free *Resource
mu sync.Mutex
}
// Get gets a free resource from the manager or returns
// nil when the manager is empty.
func (m *ResourceManager) Get() *Resource {
m.mu.Lock()
defer m.mu.Unlock()
result := m.free
if m.free != nil {
m.free = m.free.next
}
return result
}
// Put returns a resource to the pool.
func (m *ResourceManager) Put(r *Resource) {
m.mu.Lock()
defer m.mu.Unlock()
r.next = m.free
m.free = r
}
这是在测试中使用的示例:
func TestResourceManager(t *testing.T) {
// Add free resources to a manager.
var m ResourceManager
m.Put(&Resource{Resource: "/dev/a"})
m.Put(&Resource{Resource: "/dev/b"})
// Test that we can get all resources from the pool.
ra := m.Get()
rb := m.Get()
if ra.Resource > rb.Resource {
// Sort ra, rb to make test independent of order.
ra, rb = rb, ra
}
if ra == nil || ra.Resource != "/dev/a" {
t.Errorf("ra is %v, want /dev/a", ra)
}
if rb == nil || rb.Resource != "/dev/b" {
t.Errorf("rb is %v, want /dev/b", rb)
}
// Check for empty pool.
r := m.Get()
if r != nil {
t.Errorf("r is %v, want nil", r)
}
// Return one resource and try again.
m.Put(ra)
ra = m.Get()
if ra == nil || ra.Resource != "/dev/a" {
t.Errorf("ra is %v, want /dev/a", ra)
}
r = m.Get()
if r != nil {
t.Errorf("r is %v, want nil", r)
}
}
Run the test on the playground.
如果资源数量已知且合理,则使用通道。这种方法利用了运行时高度优化的通道实现。
type Resource struct {
Resource string
}
type ResourceManager struct {
free chan *Resource
}
// Get gets a free resource from the manager or returns
// nil when the manager is empty.
func (m *ResourceManager) Get() *Resource {
select {
case r := <-m.free:
return r
default:
return nil
}
}
// Put returns a resource to the pool.
func (m *ResourceManager) Put(r *Resource) {
m.free <- r
}
// NewResourceManager returns a manager that can hold up to
// n free resources.
func NewResourceManager(n int) *ResourceManager {
return &ResourceManager{free: make(chan *Resource, n)}
}
使用上面的 TestResourceManager
函数测试此实现,但将 var m ResourceManager
替换为 m := NewResourceManager(4)
。