Go - 如果为空则等待优先级队列中的下一个项目
Go - wait for next item in a priority queue if empty
我正在尝试实现优先级队列以根据优先级通过网络套接字发送 json 对象。我正在使用 container/heap
包来实现队列。我想到了这样的事情:
for {
if pq.Len() > 0 {
item := heap.Pop(&pq).(*Item)
jsonEncoder.Encode(&item)
} else {
time.Sleep(10 * time.Millisecond)
}
}
是否有比轮询优先级队列更好的等待新项目的方法?
一种方法是使用 sync.Cond
:
Cond implements a condition variable, a rendezvous point for goroutines waiting for or announcing the occurrence of an event.
包中的示例可以修改如下(对于消费者):
c.L.Lock()
for heap.Len() == 0 {
c.Wait() // Will wait until signalled by pushing routine
}
item := heap.Pop(&pq).(*Item)
c.L.Unlock()
// Do stuff with the item
生产者可以简单地做:
c.L.Lock()
heap.Push(x)
c.L.Unlock()
c.Signal()
(将它们包装在函数中并使用延迟可能是个好主意。)
这里是一个线程安全(原始)堆的例子,pop 方法等待直到项目可用:
package main
import (
"fmt"
"sort"
"sync"
"time"
"math/rand"
)
type Heap struct {
b []int
c *sync.Cond
}
func NewHeap() *Heap {
return &Heap{c: sync.NewCond(new(sync.Mutex))}
}
// Pop (waits until anything available)
func (h *Heap) Pop() int {
h.c.L.Lock()
defer h.c.L.Unlock()
for len(h.b) == 0 {
h.c.Wait()
}
// There is definitely something in there
x := h.b[len(h.b)-1]
h.b = h.b[:len(h.b)-1]
return x
}
func (h *Heap) Push(x int) {
defer h.c.Signal() // will wake up a popper
h.c.L.Lock()
defer h.c.L.Unlock()
// Add and sort to maintain priority (not really how the heap works)
h.b = append(h.b, x)
sort.Ints(h.b)
}
func main() {
heap := NewHeap()
go func() {
for range time.Tick(time.Second) {
for n := 0; n < 3; n++ {
x := rand.Intn(100)
fmt.Println("push:", x)
heap.Push(x)
}
}
}()
for {
item := heap.Pop()
fmt.Println("pop: ", item)
}
}
(请注意,由于 for range time.Tick
循环,这在操场上不起作用。运行 在本地。)
我可能会使用几个队列 goroutine。从 PriorityQueue example 中的数据结构开始,我将构建如下函数:
http://play.golang.org/p/hcNFX8ehBW
func queue(in <-chan *Item, out chan<- *Item) {
// Make us a queue!
pq := make(PriorityQueue, 0)
heap.Init(&pq)
var currentItem *Item // Our item "in hand"
var currentIn = in // Current input channel (may be nil sometimes)
var currentOut chan<- *Item // Current output channel (starts nil until we have something)
defer close(out)
for {
select {
// Read from the input
case item, ok := <-currentIn:
if !ok {
// The input has been closed. Don't keep trying to read it
currentIn = nil
// If there's nothing pending to write, we're done
if currentItem == nil {
return
}
continue
}
// Were we holding something to write? Put it back.
if currentItem != nil {
heap.Push(&pq, currentItem)
}
// Put our new thing on the queue
heap.Push(&pq, item)
// Turn on the output queue if it's not turned on
currentOut = out
// Grab our best item. We know there's at least one. We just put it there.
currentItem = heap.Pop(&pq).(*Item)
// Write to the output
case currentOut <- currentItem:
// OK, we wrote. Is there anything else?
if len(pq) > 0 {
// Hold onto it for next time
currentItem = heap.Pop(&pq).(*Item)
} else {
// Oh well, nothing to write. Is the input stream done?
if currentIn == nil {
// Then we're done
return
}
// Otherwise, turn off the output stream for now.
currentItem = nil
currentOut = nil
}
}
}
}
这是一个使用它的例子:
func main() {
// Some items and their priorities.
items := map[string]int{
"banana": 3, "apple": 2, "pear": 4,
}
in := make(chan *Item, 10) // Big input buffer and unbuffered output should give best sort ordering.
out := make(chan *Item) // But the system will "work" for any particular values
// Start the queuing engine!
go queue(in, out)
// Stick some stuff on in another goroutine
go func() {
i := 0
for value, priority := range items {
in <- &Item{
value: value,
priority: priority,
index: i,
}
i++
}
close(in)
}()
// Read the results
for item := range out {
fmt.Printf("%.2d:%s ", item.priority, item.value)
}
fmt.Println()
}
请注意,如果您 运行 这个例子,每次的顺序都会有点不同。这当然是预料之中的。这取决于输入和输出通道的速度 运行.
我正在尝试实现优先级队列以根据优先级通过网络套接字发送 json 对象。我正在使用 container/heap
包来实现队列。我想到了这样的事情:
for {
if pq.Len() > 0 {
item := heap.Pop(&pq).(*Item)
jsonEncoder.Encode(&item)
} else {
time.Sleep(10 * time.Millisecond)
}
}
是否有比轮询优先级队列更好的等待新项目的方法?
一种方法是使用 sync.Cond
:
Cond implements a condition variable, a rendezvous point for goroutines waiting for or announcing the occurrence of an event.
包中的示例可以修改如下(对于消费者):
c.L.Lock()
for heap.Len() == 0 {
c.Wait() // Will wait until signalled by pushing routine
}
item := heap.Pop(&pq).(*Item)
c.L.Unlock()
// Do stuff with the item
生产者可以简单地做:
c.L.Lock()
heap.Push(x)
c.L.Unlock()
c.Signal()
(将它们包装在函数中并使用延迟可能是个好主意。)
这里是一个线程安全(原始)堆的例子,pop 方法等待直到项目可用:
package main
import (
"fmt"
"sort"
"sync"
"time"
"math/rand"
)
type Heap struct {
b []int
c *sync.Cond
}
func NewHeap() *Heap {
return &Heap{c: sync.NewCond(new(sync.Mutex))}
}
// Pop (waits until anything available)
func (h *Heap) Pop() int {
h.c.L.Lock()
defer h.c.L.Unlock()
for len(h.b) == 0 {
h.c.Wait()
}
// There is definitely something in there
x := h.b[len(h.b)-1]
h.b = h.b[:len(h.b)-1]
return x
}
func (h *Heap) Push(x int) {
defer h.c.Signal() // will wake up a popper
h.c.L.Lock()
defer h.c.L.Unlock()
// Add and sort to maintain priority (not really how the heap works)
h.b = append(h.b, x)
sort.Ints(h.b)
}
func main() {
heap := NewHeap()
go func() {
for range time.Tick(time.Second) {
for n := 0; n < 3; n++ {
x := rand.Intn(100)
fmt.Println("push:", x)
heap.Push(x)
}
}
}()
for {
item := heap.Pop()
fmt.Println("pop: ", item)
}
}
(请注意,由于 for range time.Tick
循环,这在操场上不起作用。运行 在本地。)
我可能会使用几个队列 goroutine。从 PriorityQueue example 中的数据结构开始,我将构建如下函数:
http://play.golang.org/p/hcNFX8ehBW
func queue(in <-chan *Item, out chan<- *Item) {
// Make us a queue!
pq := make(PriorityQueue, 0)
heap.Init(&pq)
var currentItem *Item // Our item "in hand"
var currentIn = in // Current input channel (may be nil sometimes)
var currentOut chan<- *Item // Current output channel (starts nil until we have something)
defer close(out)
for {
select {
// Read from the input
case item, ok := <-currentIn:
if !ok {
// The input has been closed. Don't keep trying to read it
currentIn = nil
// If there's nothing pending to write, we're done
if currentItem == nil {
return
}
continue
}
// Were we holding something to write? Put it back.
if currentItem != nil {
heap.Push(&pq, currentItem)
}
// Put our new thing on the queue
heap.Push(&pq, item)
// Turn on the output queue if it's not turned on
currentOut = out
// Grab our best item. We know there's at least one. We just put it there.
currentItem = heap.Pop(&pq).(*Item)
// Write to the output
case currentOut <- currentItem:
// OK, we wrote. Is there anything else?
if len(pq) > 0 {
// Hold onto it for next time
currentItem = heap.Pop(&pq).(*Item)
} else {
// Oh well, nothing to write. Is the input stream done?
if currentIn == nil {
// Then we're done
return
}
// Otherwise, turn off the output stream for now.
currentItem = nil
currentOut = nil
}
}
}
}
这是一个使用它的例子:
func main() {
// Some items and their priorities.
items := map[string]int{
"banana": 3, "apple": 2, "pear": 4,
}
in := make(chan *Item, 10) // Big input buffer and unbuffered output should give best sort ordering.
out := make(chan *Item) // But the system will "work" for any particular values
// Start the queuing engine!
go queue(in, out)
// Stick some stuff on in another goroutine
go func() {
i := 0
for value, priority := range items {
in <- &Item{
value: value,
priority: priority,
index: i,
}
i++
}
close(in)
}()
// Read the results
for item := range out {
fmt.Printf("%.2d:%s ", item.priority, item.value)
}
fmt.Println()
}
请注意,如果您 运行 这个例子,每次的顺序都会有点不同。这当然是预料之中的。这取决于输入和输出通道的速度 运行.