Goroutines 和消息去重
Goroutines and messages de-duplicatioin
所以我有一些事件队列和几个 goroutine,它们在无限循环中从相应的队列中获取事件,处理它们,并将结果发送到通道中。不同的队列可能会给你相同的事件,所以我需要确保每个事件都被发送到通道一次,并且该消息在另一个队列中的任何出现都将被忽略。我认为这更像是一个架构问题,但我不知道如何正确处理它。
我当前代码的简化版本如下。
获取和处理传入事件的 Goroutine 看起来有点像这样:
func (q *Queue) ProcessEvents(handler Handler) {
lastEvent = 0
for {
events = getEvents(lastEvent)
for _, e := range events {
if e.ID > lastEvent {
lastEvent = event.ID
}
handler.Handle(e)
}
}
}
处理程序:
type Handler struct {
c chan Event
}
func (h *Handler) Handle(event *Event) {
//event processing omitted
h.c <- event //Now it just sends a processed event into the channel no matter what.
}
我在 main() 中做
func main() {
msgc := make(chan Event)
for _, q := range queues {
go func(queue Queue) {
queue.ProcessEvents(&Handler{msgc})
}
}
}
所以你代表你现在的架构如下:
使用这种类型的解决方案,Generators 需要检查共享资源以查看是否已发出事件。这可能看起来像这样:
var hasEmmited map[string]bool
var lock sync.Mutex
func HasEmitted(event e) bool {
lock.Lock()
defer lock.Unlock()
e,ok := hasEmmited[e.ID]
return e && ok
}
func SetEmmited(event e) {
lock.Lock()
defer lock.Lock()
hasEmmited[e.ID] = true
}
这需要 locking/unlocking,即使在没有争用的最佳情况下,考虑到关键部分中完成的少量工作,这也是一个很大的开销。
在架构上做一个小改动,就像在第二张图中,一个 go-routine 可以在没有任何锁定的情况下进行过滤。
一些评论者表示,使用 go-routines 设计解决方案与设计单线程应用程序相同。我不相信是这样。
我建议查看:
Golang 相关消息:https://blog.golang.org/pipelines
一些消息处理设计模式:http://www.enterpriseintegrationpatterns.com/
企业集成模式在这里可能看起来格格不入,但它涵盖了很多在 go 中也适用的消息传递模式。
所以我有一些事件队列和几个 goroutine,它们在无限循环中从相应的队列中获取事件,处理它们,并将结果发送到通道中。不同的队列可能会给你相同的事件,所以我需要确保每个事件都被发送到通道一次,并且该消息在另一个队列中的任何出现都将被忽略。我认为这更像是一个架构问题,但我不知道如何正确处理它。
我当前代码的简化版本如下。
获取和处理传入事件的 Goroutine 看起来有点像这样:
func (q *Queue) ProcessEvents(handler Handler) {
lastEvent = 0
for {
events = getEvents(lastEvent)
for _, e := range events {
if e.ID > lastEvent {
lastEvent = event.ID
}
handler.Handle(e)
}
}
}
处理程序:
type Handler struct {
c chan Event
}
func (h *Handler) Handle(event *Event) {
//event processing omitted
h.c <- event //Now it just sends a processed event into the channel no matter what.
}
我在 main() 中做
func main() {
msgc := make(chan Event)
for _, q := range queues {
go func(queue Queue) {
queue.ProcessEvents(&Handler{msgc})
}
}
}
所以你代表你现在的架构如下:
使用这种类型的解决方案,Generators 需要检查共享资源以查看是否已发出事件。这可能看起来像这样:
var hasEmmited map[string]bool
var lock sync.Mutex
func HasEmitted(event e) bool {
lock.Lock()
defer lock.Unlock()
e,ok := hasEmmited[e.ID]
return e && ok
}
func SetEmmited(event e) {
lock.Lock()
defer lock.Lock()
hasEmmited[e.ID] = true
}
这需要 locking/unlocking,即使在没有争用的最佳情况下,考虑到关键部分中完成的少量工作,这也是一个很大的开销。
在架构上做一个小改动,就像在第二张图中,一个 go-routine 可以在没有任何锁定的情况下进行过滤。
一些评论者表示,使用 go-routines 设计解决方案与设计单线程应用程序相同。我不相信是这样。 我建议查看:
Golang 相关消息:https://blog.golang.org/pipelines
一些消息处理设计模式:http://www.enterpriseintegrationpatterns.com/
企业集成模式在这里可能看起来格格不入,但它涵盖了很多在 go 中也适用的消息传递模式。