如何按 user_id 对 Kafka 中的主题进行分区?
How to make a partition of a topic in Kafka by user_id?
我正在使用 SpringBoot 构建一个 Web 应用程序后端,我必须使用 Kafka 来发送消息。
我想要一个主题,例如“testTopic”,我想在那里生成一些来自不同用户的消息,以便稍后将消息发送到不同的机器。
如果用户A向他的机器发送消息,用户B向他的机器发送消息。
我如何区分谁发送了哪条消息以及它应该到达哪台机器?
我读过有关 Kafka 主题分区的内容,但我不知道我是否在我的代码中做得很好。
这是我的话题
@Bean
public NewTopic kafkaExampleTopic() {
return TopicBuilder.name("TestTopic").partitions(1).build();
}
我正在向该主题发送数据
@Bean
CommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {
return args -> {
kafkaTemplate.send("TestTopic", String.valueOf(MessageBuilder.withPayload("Hello kafka testTopic uno con key 1")
.setHeader(KafkaHeaders.MESSAGE_KEY, "1").build()));
kafkaTemplate.send("TestTopic", String.valueOf(MessageBuilder.withPayload("Hello kafka testTopic uno con key 2")
.setHeader(KafkaHeaders.MESSAGE_KEY, "2").build()));
};
}
这是我的听众
@KafkaListener(topics = "TestTopic", groupId = "exampleGroupId")
public void listenWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message
+ "from partition: " + partition);
}
非常感谢你们!
主题分区需要提前决定。
例如,如果您有数字 ID,则可以定义一个包含 10 个分区的主题,然后创建您自己的分区程序 class,它将根据每个数字的前导数字(ids 1, 10、15 等都转到分区 1)。如果您使用十六进制值(例如 UUID),主题可能有 16 个分区 (a-f, 0-9)。 Alphanumeric-lowercase - 36,依此类推。
默认情况下,Kafka 的 DefaultPartitioner 将根据主题分区的数量执行 Murmur2 哈希 modulo-d。这样,例如,id 5 和 7 可能最终位于同一分区中。根据您消费者的需求,这可能不是您想要的。
消费者是不同机器上的运行。分区应该无关紧要,除非知道同一组的消费者不能分配相同的分区(如果你只有一个分区,则每组只有一个消费者可以读取它)。
我正在使用 SpringBoot 构建一个 Web 应用程序后端,我必须使用 Kafka 来发送消息。 我想要一个主题,例如“testTopic”,我想在那里生成一些来自不同用户的消息,以便稍后将消息发送到不同的机器。
如果用户A向他的机器发送消息,用户B向他的机器发送消息。 我如何区分谁发送了哪条消息以及它应该到达哪台机器?
我读过有关 Kafka 主题分区的内容,但我不知道我是否在我的代码中做得很好。
这是我的话题
@Bean
public NewTopic kafkaExampleTopic() {
return TopicBuilder.name("TestTopic").partitions(1).build();
}
我正在向该主题发送数据
@Bean
CommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {
return args -> {
kafkaTemplate.send("TestTopic", String.valueOf(MessageBuilder.withPayload("Hello kafka testTopic uno con key 1")
.setHeader(KafkaHeaders.MESSAGE_KEY, "1").build()));
kafkaTemplate.send("TestTopic", String.valueOf(MessageBuilder.withPayload("Hello kafka testTopic uno con key 2")
.setHeader(KafkaHeaders.MESSAGE_KEY, "2").build()));
};
}
这是我的听众
@KafkaListener(topics = "TestTopic", groupId = "exampleGroupId")
public void listenWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message
+ "from partition: " + partition);
}
非常感谢你们!
主题分区需要提前决定。
例如,如果您有数字 ID,则可以定义一个包含 10 个分区的主题,然后创建您自己的分区程序 class,它将根据每个数字的前导数字(ids 1, 10、15 等都转到分区 1)。如果您使用十六进制值(例如 UUID),主题可能有 16 个分区 (a-f, 0-9)。 Alphanumeric-lowercase - 36,依此类推。
默认情况下,Kafka 的 DefaultPartitioner 将根据主题分区的数量执行 Murmur2 哈希 modulo-d。这样,例如,id 5 和 7 可能最终位于同一分区中。根据您消费者的需求,这可能不是您想要的。
消费者是不同机器上的运行。分区应该无关紧要,除非知道同一组的消费者不能分配相同的分区(如果你只有一个分区,则每组只有一个消费者可以读取它)。