我们如何在 kafka 中快速写入单条消息(不是批量消息)?
How we can write single message(not batch) fast in kafka?
我是 Golang 和 Kafka 的新手,我正在使用 segmentio kafka-go 连接到使用 Golang 的 Kafka 服务器。截至目前,我想在 Kafka 中推送用户的每个事件,所以我想推送单个消息(而不是批量),但是由于该库提供的写入操作对批处理或单个消息都需要相同的时间,因此需要很多时间。有什么方法可以快速编写单个消息,以便我可以在更短的时间内在 kafka 中推送百万个事件?
我已经针对单个消息和批量消息对其进行了测试,它花费的时间相同(最小为 10 毫秒)。
我对golang不是很了解。但是使用 Writer.WriteMessages 的以下函数具有同步发送选项。
快速写入(使用同步发送)实际上取决于您的网络往返时间,即向 Kafka 发送消息所花费的时间加上从 Kafka 获得确认所花费的时间。
如果您正在使用 同步发送,那么您的发送将被阻止,直到收到确认。
所以,为了让它更快,一种方式是减少确认。最好将它设置为 1(意思是,领导者已将消息写入其日志,但不会复制给追随者)。但是如果领导者宕机并且消息没有被复制,这可能会导致丢失。
因此,您可以将其设置为 acks=all
并更改主题上的 min.insync.replicas=2
。值越小,您的 send()
returns 速度越快,下一条消息推送到 Kafka 的速度就越快。
我认为你的问题只是 WriterConfig。
例如,如果您的配置类似于 segmentio/kafka-go 文档中的示例:
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
Balancer: &kafka.LeastBytes{},
})
您可以尝试设置批量大小和批量超时:
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
Balancer: &kafka.LeastBytes{},
BatchSize: 1,
BatchTimeout: 10 * time.Millisecond,
})
这是因为 kafka-go 默认等待 1 秒,直到批处理达到最大大小,默认情况下为 100 条消息,正如我们在 code.
中看到的
希望对你有帮助。
更新:请注意,一条一条地发送消息会减慢进程。
例如:批量发送 100 条消息在我的电脑上花费了 0.0107s。一条一条发送同样的 100 条消息用了 0.0244 秒。
我是 Golang 和 Kafka 的新手,我正在使用 segmentio kafka-go 连接到使用 Golang 的 Kafka 服务器。截至目前,我想在 Kafka 中推送用户的每个事件,所以我想推送单个消息(而不是批量),但是由于该库提供的写入操作对批处理或单个消息都需要相同的时间,因此需要很多时间。有什么方法可以快速编写单个消息,以便我可以在更短的时间内在 kafka 中推送百万个事件?
我已经针对单个消息和批量消息对其进行了测试,它花费的时间相同(最小为 10 毫秒)。
我对golang不是很了解。但是使用 Writer.WriteMessages 的以下函数具有同步发送选项。
快速写入(使用同步发送)实际上取决于您的网络往返时间,即向 Kafka 发送消息所花费的时间加上从 Kafka 获得确认所花费的时间。
如果您正在使用 同步发送,那么您的发送将被阻止,直到收到确认。 所以,为了让它更快,一种方式是减少确认。最好将它设置为 1(意思是,领导者已将消息写入其日志,但不会复制给追随者)。但是如果领导者宕机并且消息没有被复制,这可能会导致丢失。
因此,您可以将其设置为 acks=all
并更改主题上的 min.insync.replicas=2
。值越小,您的 send()
returns 速度越快,下一条消息推送到 Kafka 的速度就越快。
我认为你的问题只是 WriterConfig。
例如,如果您的配置类似于 segmentio/kafka-go 文档中的示例:
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
Balancer: &kafka.LeastBytes{},
})
您可以尝试设置批量大小和批量超时:
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "topic-A",
Balancer: &kafka.LeastBytes{},
BatchSize: 1,
BatchTimeout: 10 * time.Millisecond,
})
这是因为 kafka-go 默认等待 1 秒,直到批处理达到最大大小,默认情况下为 100 条消息,正如我们在 code.
中看到的希望对你有帮助。
更新:请注意,一条一条地发送消息会减慢进程。 例如:批量发送 100 条消息在我的电脑上花费了 0.0107s。一条一条发送同样的 100 条消息用了 0.0244 秒。