Go - 如果为空则等待优先级队列中的下一个项目

Go - wait for next item in a priority queue if empty

我正在尝试实现优先级队列以根据优先级通过网络套接字发送 json 对象。我正在使用 container/heap 包来实现队列。我想到了这样的事情:

for {
    if pq.Len() > 0 {
        item := heap.Pop(&pq).(*Item)
        jsonEncoder.Encode(&item)
    } else {
        time.Sleep(10 * time.Millisecond)
    }
}

是否有比轮询优先级队列更好的等待新项目的方法?

一种方法是使用 sync.Cond:

Cond implements a condition variable, a rendezvous point for goroutines waiting for or announcing the occurrence of an event.

包中的示例可以修改如下(对于消费者):

c.L.Lock()
for heap.Len() == 0 {
    c.Wait() // Will wait until signalled by pushing routine
}
item := heap.Pop(&pq).(*Item)
c.L.Unlock()
// Do stuff with the item

生产者可以简单地做:

c.L.Lock()
heap.Push(x)
c.L.Unlock()
c.Signal()

(将它们包装在函数中并使用延迟可能是个好主意。)

这里是一个线程安全(原始)堆的例子,pop 方法等待直到项目可用:

package main

import (
    "fmt"
    "sort"
    "sync"
    "time"
    "math/rand"
)

type Heap struct {
    b []int
    c *sync.Cond
}

func NewHeap() *Heap {
    return &Heap{c: sync.NewCond(new(sync.Mutex))}
}

// Pop (waits until anything available)
func (h *Heap) Pop() int {
    h.c.L.Lock()
    defer h.c.L.Unlock()
    for len(h.b) == 0 {
        h.c.Wait()
    }
    // There is definitely something in there
    x := h.b[len(h.b)-1]
    h.b = h.b[:len(h.b)-1]
    return x
}

func (h *Heap) Push(x int) {
    defer h.c.Signal() // will wake up a popper
    h.c.L.Lock()
    defer h.c.L.Unlock()
    // Add and sort to maintain priority (not really how the heap works)
    h.b = append(h.b, x)
    sort.Ints(h.b)
}

func main() {
    heap := NewHeap()

    go func() {
        for range time.Tick(time.Second) {
            for n := 0; n < 3; n++ {
                x := rand.Intn(100)
                fmt.Println("push:", x)
                heap.Push(x)
            }
        }
    }()

    for {
        item := heap.Pop()
        fmt.Println("pop: ", item)
    }
}

(请注意,由于 for range time.Tick 循环,这在操场上不起作用。运行 在本地。)

我可能会使用几个队列 goroutine。从 PriorityQueue example 中的数据结构开始,我将构建如下函数:

http://play.golang.org/p/hcNFX8ehBW

func queue(in <-chan *Item, out chan<- *Item) {
    // Make us a queue!
    pq := make(PriorityQueue, 0)
    heap.Init(&pq)

    var currentItem *Item       // Our item "in hand"
    var currentIn = in          // Current input channel (may be nil sometimes)
    var currentOut chan<- *Item // Current output channel (starts nil until we have something)

    defer close(out)

    for {
        select {
        // Read from the input
        case item, ok := <-currentIn:
            if !ok {
                // The input has been closed. Don't keep trying to read it
                currentIn = nil
                // If there's nothing pending to write, we're done
                if currentItem == nil {
                    return
                }
                continue
            }

            // Were we holding something to write? Put it back.
            if currentItem != nil {
                heap.Push(&pq, currentItem)
            }

            // Put our new thing on the queue
            heap.Push(&pq, item)

            // Turn on the output queue if it's not turned on
            currentOut = out

            // Grab our best item. We know there's at least one. We just put it there.
            currentItem = heap.Pop(&pq).(*Item)

            // Write to the output
        case currentOut <- currentItem:
            // OK, we wrote. Is there anything else?
            if len(pq) > 0 {
                // Hold onto it for next time
                currentItem = heap.Pop(&pq).(*Item)
            } else {
                // Oh well, nothing to write. Is the input stream done?
                if currentIn == nil {
                    // Then we're done
                    return
                }

                // Otherwise, turn off the output stream for now.
                currentItem = nil
                currentOut = nil
            }
        }
    }
}

这是一个使用它的例子:

func main() {
    // Some items and their priorities.
    items := map[string]int{
        "banana": 3, "apple": 2, "pear": 4,
    }

    in := make(chan *Item, 10) // Big input buffer and unbuffered output should give best sort ordering.
    out := make(chan *Item)    // But the system will "work" for any particular values

    // Start the queuing engine!
    go queue(in, out)

    // Stick some stuff on in another goroutine
    go func() {
        i := 0
        for value, priority := range items {
            in <- &Item{
                value:    value,
                priority: priority,
                index:    i,
            }
            i++
        }
        close(in)
    }()

    // Read the results
    for item := range out {
        fmt.Printf("%.2d:%s ", item.priority, item.value)
    }
    fmt.Println()
}

请注意,如果您 运行 这个例子,每次的顺序都会有点不同。这当然是预料之中的。这取决于输入和输出通道的速度 运行.