如何让每条消息处理成功?
How to make every message process successfully?
下面是一个包含 3 个 Go 例程的服务,用于处理来自 Kafka 的消息:
Channel-1 和 Channel-2 是 Go 中的无缓冲数据通道。通道就像一个排队机制。
Goroutine-1 从 kafka 主题中读取一条消息,在验证消息后将其消息有效负载抛出到 Channel-1。
Goroutine-2 从 Channel-1 读取并处理 payload,并将处理后的 payload 抛到 Channel-2。
Goroutine-3从Channel-2读取并将处理后的payload封装到http数据包中,向其他服务执行http请求(使用http客户端)。
上述流程中的漏洞:在我们的例子中,由于服务之间的网络连接不良或远程服务尚未准备好接受来自 Go-routine3 的 http 请求(http 客户端超时),处理失败,因此,上面服务丢失了该消息(已从 Kafka 主题中读取)。
Goroutine-1 当前订阅了来自 Kafka 的消息没有发送给 Kafka 的确认(通知 Goroutine-3 成功处理了特定消息)
正确性高于性能。
如何保证每条消息都处理成功?
为确保正确性,您需要在处理成功完成后提交(=确认)消息。
对于处理不成功的情况——一般情况下,需要自己实现重试机制。
这应该特定于您的用例,但通常您将消息扔回专用的 Kafka 重试主题(您创建),添加睡眠并再次处理消息。如果在 x 次之后处理失败 - 你将消息扔到 DLQ(=死信队列)。
您可以在这里阅读更多内容:
https://eng.uber.com/reliable-reprocessing/
https://www.confluent.io/blog/error-handling-patterns-in-kafka/
例如,通过新的 Channel-3 将 Goroutine-3 的反馈添加到 Goroutine-1。 Goroutine-1 将阻塞,直到它从 Channel-3 获得确认。
// in gorouting 1
channel1 <- data
select {
case <-channel3:
case <-ctx.Done(): // or smth else to prevent deadlock
}
...
// in gorouting 3
data := <-channel2
for {
if err := sendData(data); err == nil {
break
}
}
channel3<-struct{}{}
下面是一个包含 3 个 Go 例程的服务,用于处理来自 Kafka 的消息:
Channel-1 和 Channel-2 是 Go 中的无缓冲数据通道。通道就像一个排队机制。
Goroutine-1 从 kafka 主题中读取一条消息,在验证消息后将其消息有效负载抛出到 Channel-1。
Goroutine-2 从 Channel-1 读取并处理 payload,并将处理后的 payload 抛到 Channel-2。
Goroutine-3从Channel-2读取并将处理后的payload封装到http数据包中,向其他服务执行http请求(使用http客户端)。
上述流程中的漏洞:在我们的例子中,由于服务之间的网络连接不良或远程服务尚未准备好接受来自 Go-routine3 的 http 请求(http 客户端超时),处理失败,因此,上面服务丢失了该消息(已从 Kafka 主题中读取)。
Goroutine-1 当前订阅了来自 Kafka 的消息没有发送给 Kafka 的确认(通知 Goroutine-3 成功处理了特定消息)
正确性高于性能。
如何保证每条消息都处理成功?
为确保正确性,您需要在处理成功完成后提交(=确认)消息。
对于处理不成功的情况——一般情况下,需要自己实现重试机制。
这应该特定于您的用例,但通常您将消息扔回专用的 Kafka 重试主题(您创建),添加睡眠并再次处理消息。如果在 x 次之后处理失败 - 你将消息扔到 DLQ(=死信队列)。
您可以在这里阅读更多内容:
https://eng.uber.com/reliable-reprocessing/
https://www.confluent.io/blog/error-handling-patterns-in-kafka/
例如,通过新的 Channel-3 将 Goroutine-3 的反馈添加到 Goroutine-1。 Goroutine-1 将阻塞,直到它从 Channel-3 获得确认。
// in gorouting 1
channel1 <- data
select {
case <-channel3:
case <-ctx.Done(): // or smth else to prevent deadlock
}
...
// in gorouting 3
data := <-channel2
for {
if err := sendData(data); err == nil {
break
}
}
channel3<-struct{}{}