如何让每条消息处理成功?

How to make every message process successfully?

下面是一个包含 3 个 Go 例程的服务,用于处理来自 Kafka 的消息:


Channel-1 和 Channel-2 是 Go 中的无缓冲数据通道。通道就像一个排队机制。

Goroutine-1 从 kafka 主题中读取一条消息,在验证消息后将其消息有效负载抛出到 Channel-1。

Goroutine-2 从 Channel-1 读取并处理 payload,并将处理后的 payload 抛到 Channel-2。

Goroutine-3从Channel-2读取并将处理后的payload封装到http数据包中,向其他服务执行http请求(使用http客户端)。

上述流程中的漏洞:在我们的例子中,由于服务之间的网络连接不良或远程服务尚未准备好接受来自 Go-routine3 的 http 请求(http 客户端超时),处理失败,因此,上面服务丢失了该消息(已从 Kafka 主题中读取)。


Goroutine-1 当前订阅了来自 Kafka 的消息没有发送给 Kafka 的确认(通知 Goroutine-3 成功处理了特定消息)

正确性高于性能。


如何保证每条消息都处理成功?

为确保正确性,您需要在处理成功完成后提交(=确认)消息。
对于处理不成功的情况——一般情况下,需要自己实现重试机制。
这应该特定于您的用例,但通常您将消息扔回专用的 Kafka 重试主题(您创建),添加睡眠并再次处理消息。如果在 x 次之后处理失败 - 你将消息扔到 DLQ(=死信队列)。
您可以在这里阅读更多内容:
https://eng.uber.com/reliable-reprocessing/
https://www.confluent.io/blog/error-handling-patterns-in-kafka/

例如,通过新的 Channel-3 将 Goroutine-3 的反馈添加到 Goroutine-1。 Goroutine-1 将阻塞,直到它从 Channel-3 获得确认。

// in gorouting 1
channel1 <- data
select {
    case <-channel3:
    case <-ctx.Done(): // or smth else to prevent deadlock 
}
...
// in gorouting 3
data := <-channel2
for {
    if err := sendData(data); err == nil {
        break
    }
}
channel3<-struct{}{}