Goroutines 和消息去重

Goroutines and messages de-duplicatioin

所以我有一些事件队列和几个 goroutine,它们在无限循环中从相应的队列中获取事件,处理它们,并将结果发送到通道中。不同的队列可能会给你相同的事件,所以我需要确保每个事件都被发送到通道一次,并且该消息在另一个队列中的任何出现都将被忽略。我认为这更像是一个架构问题,但我不知道如何正确处理它。

我当前代码的简化版本如下。

获取和处理传入事件的 Goroutine 看起来有点像这样:

func (q *Queue) ProcessEvents(handler Handler) {
   lastEvent = 0
   for {
       events = getEvents(lastEvent)
       for _, e := range events {
           if e.ID > lastEvent  {
                lastEvent = event.ID
           }
           handler.Handle(e)
       }
   }
}

处理程序:

type Handler struct {
    c chan Event
}

func (h *Handler) Handle(event *Event) {
    //event processing omitted
    h.c <- event //Now it just sends a processed event into the channel no matter what.
}

我在 main() 中做

func main() {
    msgc := make(chan Event)
    for _, q := range queues {
        go func(queue Queue) {
            queue.ProcessEvents(&Handler{msgc})
        }
    }
}

所以你代表你现在的架构如下:

使用这种类型的解决方案,Generators 需要检查共享资源以查看是否已发出事件。这可能看起来像这样:

var hasEmmited map[string]bool
var lock sync.Mutex

func HasEmitted(event e) bool {
   lock.Lock()
   defer lock.Unlock()
   e,ok := hasEmmited[e.ID]
   return e && ok
}

func SetEmmited(event e) {
   lock.Lock()
   defer lock.Lock()
   hasEmmited[e.ID] = true
}

这需要 locking/unlocking,即使在没有争用的最佳情况下,考虑到关键部分中完成的少量工作,这也是一个很大的开销。

在架构上做一个小改动,就像在第二张图中,一个 go-routine 可以在没有任何锁定的情况下进行过滤。

一些评论者表示,使用 go-routines 设计解决方案与设计单线程应用程序相同。我不相信是这样。 我建议查看:

Golang 相关消息:https://blog.golang.org/pipelines

一些消息处理设计模式:http://www.enterpriseintegrationpatterns.com/

企业集成模式在这里可能看起来格格不入,但它涵盖了很多在 go 中也适用的消息传递模式。