在 sarama 中创建 Kafka 主题
Creating Kafka topic in sarama
是否可以在 sarama 中创建 kafka 主题?
我知道 java API 可以让您创建主题,但我在 sarama 中找不到有关如何创建主题的任何信息。
如果可能的话,我应该使用 api 的示例或解释会很棒
提前致谢
EDIT :下面是一个仍然有效的旧答案,但那时 sarama admin api 正在开发中。从那时起,ClusterAdmin api 已经走了很长一段路,今天应该被视为解决这个问题的首选方法。如果您希望在 2020 年以后解决此问题,请参阅下面的其他 2 个答案。
可以使用 sarama 来管理 Kafka 中的主题。我正在编写一个用于管理 Kafka 主题的 Terraform 提供程序,并使用 sarama 在后端完成繁重的工作。
您需要使用 sarama.Broker api 来执行此操作。例如
// Set broker configuration
broker := sarama.NewBroker("localhost:9092")
// Additional configurations. Check sarama doc for more info
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
// Open broker connection with configs defined above
broker.Open(config)
// check if the connection was OK
connected, err := broker.Connected()
if err != nil {
log.Print(err.Error())
}
log.Print(connected)
// Setup the Topic details in CreateTopicRequest struct
topic := "blah25s"
topicDetail := &sarama.TopicDetail{}
topicDetail.NumPartitions = int32(1)
topicDetail.ReplicationFactor = int16(1)
topicDetail.ConfigEntries = make(map[string]*string)
topicDetails := make(map[string]*sarama.TopicDetail)
topicDetails[topic] = topicDetail
request := sarama.CreateTopicsRequest{
Timeout: time.Second * 15,
TopicDetails: topicDetails,
}
// Send request to Broker
response, err := broker.CreateTopics(&request)
// handle errors if any
if err != nil {
log.Printf("%#v", &err)
}
t := response.TopicErrors
for key, val := range t {
log.Printf("Key is %s", key)
log.Printf("Value is %#v", val.Err.Error())
log.Printf("Value3 is %#v", val.ErrMsg)
}
log.Printf("the response is %#v", response)
// close connection to broker
broker.Close()
您可以在 github 查看工作代码。记得在 运行 代码之前启动 kafka broker 并导入所有 golang 依赖。
最好直接使用 : https://github.com/Shopify/sarama/blob/master/admin.go 而不是直接连接到代理。
这处理了很多情况,例如:
- 您可以为集群配置添加多个代理地址。
- 识别哪个经纪商作为控制者是自动完成的。
确实,在较新版本的 Sarama 中,您可以使用 ClusterAdmin 创建主题。您可以在下面找到示例代码:
package main
import (
"github.com/Shopify/sarama" // Sarama 1.22.0
"log"
)
func main() {
brokerAddrs := []string{"localhost:9092"}
config := sarama.NewConfig()
config.Version = sarama.V2_1_0_0
admin, err := sarama.NewClusterAdmin(brokerAddrs, config)
if err != nil {
log.Fatal("Error while creating cluster admin: ", err.Error())
}
defer func() { _ = admin.Close() }()
err = admin.CreateTopic("topic.test.1", &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}, false)
if err != nil {
log.Fatal("Error while creating topic: ", err.Error())
}
}
是否可以在 sarama 中创建 kafka 主题? 我知道 java API 可以让您创建主题,但我在 sarama 中找不到有关如何创建主题的任何信息。 如果可能的话,我应该使用 api 的示例或解释会很棒 提前致谢
EDIT :下面是一个仍然有效的旧答案,但那时 sarama admin api 正在开发中。从那时起,ClusterAdmin api 已经走了很长一段路,今天应该被视为解决这个问题的首选方法。如果您希望在 2020 年以后解决此问题,请参阅下面的其他 2 个答案。
可以使用 sarama 来管理 Kafka 中的主题。我正在编写一个用于管理 Kafka 主题的 Terraform 提供程序,并使用 sarama 在后端完成繁重的工作。
您需要使用 sarama.Broker api 来执行此操作。例如
// Set broker configuration
broker := sarama.NewBroker("localhost:9092")
// Additional configurations. Check sarama doc for more info
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
// Open broker connection with configs defined above
broker.Open(config)
// check if the connection was OK
connected, err := broker.Connected()
if err != nil {
log.Print(err.Error())
}
log.Print(connected)
// Setup the Topic details in CreateTopicRequest struct
topic := "blah25s"
topicDetail := &sarama.TopicDetail{}
topicDetail.NumPartitions = int32(1)
topicDetail.ReplicationFactor = int16(1)
topicDetail.ConfigEntries = make(map[string]*string)
topicDetails := make(map[string]*sarama.TopicDetail)
topicDetails[topic] = topicDetail
request := sarama.CreateTopicsRequest{
Timeout: time.Second * 15,
TopicDetails: topicDetails,
}
// Send request to Broker
response, err := broker.CreateTopics(&request)
// handle errors if any
if err != nil {
log.Printf("%#v", &err)
}
t := response.TopicErrors
for key, val := range t {
log.Printf("Key is %s", key)
log.Printf("Value is %#v", val.Err.Error())
log.Printf("Value3 is %#v", val.ErrMsg)
}
log.Printf("the response is %#v", response)
// close connection to broker
broker.Close()
您可以在 github 查看工作代码。记得在 运行 代码之前启动 kafka broker 并导入所有 golang 依赖。
最好直接使用 : https://github.com/Shopify/sarama/blob/master/admin.go 而不是直接连接到代理。
这处理了很多情况,例如:
- 您可以为集群配置添加多个代理地址。
- 识别哪个经纪商作为控制者是自动完成的。
确实,在较新版本的 Sarama 中,您可以使用 ClusterAdmin 创建主题。您可以在下面找到示例代码:
package main
import (
"github.com/Shopify/sarama" // Sarama 1.22.0
"log"
)
func main() {
brokerAddrs := []string{"localhost:9092"}
config := sarama.NewConfig()
config.Version = sarama.V2_1_0_0
admin, err := sarama.NewClusterAdmin(brokerAddrs, config)
if err != nil {
log.Fatal("Error while creating cluster admin: ", err.Error())
}
defer func() { _ = admin.Close() }()
err = admin.CreateTopic("topic.test.1", &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}, false)
if err != nil {
log.Fatal("Error while creating topic: ", err.Error())
}
}