事件处理中的死锁

Deadlock in event processing

所以我有一个用于事件处理的通道,这个通道上的主服务器 goroutine select 并在收到的每个事件上调用事件处理程序:

evtCh := make(chan Event)
// server loop:
for !quit {
    select {
    case e := <- evtCh:
        handleEvent(e)
        break
    case quit := <-quitCh:
        //finish
}

// for send a new event to processing
func addEvent(e Event) {
    evtCh <- e
}

handleEvent 将调用事件类型上已注册的处理程序。我有 func registerEventHandler(typ EventType, func(Event)) 来处理寄存器。该程序将支持用户编写扩展,这意味着他们可以注册自己的处理程序来处理事件。

现在问题出现在用户的事件处理程序中,他们可能会通过调用 addEvent 向服务器发送新事件,这将导致服务器挂起,因为事件处理程序本身是在服务器的主循环(在 for 循环中)。

如何优雅地处理这种情况?由切片建模的队列是个好主意吗?

this will cause the server to hang since the event handler itself is called in the context of the server's main loop

主循环不应该在调用 handleEvent 时阻塞,避免这种情况的最常见方法是使用工作协程池。这是一个快速 未经测试的 示例:

type Worker struct {
    id int
    ch chan Event
    quit chan bool
}

func (w *Worker) start {
    for {
        select {
            case e := <- w.ch:
                fmt.Printf("Worker %d called\n", w.id)
                //handle event
                break;
            case <- w.quit:
                return
        }
    }
}


ch := make(chan Event, 100)
quit := make(chan bool, 0)

// Start workers
for i:=0; i<10; i++{
    worker := &Worker{i,ch,quit}
    go worker.start()
}

// 
func addEvent (e Event) {
    ch <- e
}

完成后,close(quit) 杀死所有工人。

编辑:来自以下评论:

what is the main loop looks like in this case?

视情况而定。如果你有固定数量的事件,你可以使用 WaitGroup,像这样:

type Worker struct {
    id int
    ch chan Event
    quit chan bool
    wg *sync.WaitGroup
}

func (w *Worker) start {
    for {
        select {
            case e := <- w.ch:
                //handle event
                wg.Done()

                break;
            case <- w.quit:
                return
        }
    }
}

func main() {
    ch := make(chan Event, 100)
    quit := make(chan bool, 0)

    numberOfEvents := 100

    wg := &sync.WaitGroup{}
    wg.Add(numberOfEvents)

    // start workers
    for i:=0; i<10; i++{
        worker := &Worker{i,ch,quit,wg}
        go worker.start()
    }


    wg.Wait() // Blocks until all events are handled
}

如果事先不知道事件的数量,您可以在退出频道上阻塞:

<- quit

一旦另一个 goroutine 关闭通道,您的程序也将终止。

要使事情更加异步,您可以

  • 增加事件通道容量

    evtCh := make(chan Event, 10)

  • 异步调用 handleEvent(e)

    go handleEvent(e)

  • 在处理程序中异步添加事件

    go addEvent(e)

或者,如果您希望以确定的顺序处理事件,您可以直接在处理程序中调用 handleEvent(e) 而不是 addEvent(e)