如何在具有不同配置的 Kafka 主题之间分发消息?
How to distribute messages between Kafka topics with different configuration?
我正在寻找一种在两个 Kafka 主题之间分发消息的方法。在原始主题中,我有 20 个分区,每个分区有 1000000 条消息。我想要一个包含 1000 个分区的新主题,并在新的更宽的分区范围内传播消息。
1T -> 20P -> 1000000 messages per partition (total 20m/topic)
2T -> 1000P -> 20000 messages per partition (total 20m/topic)
是否可以在 Kafka 中做到这一点(通过主题镜像或其他技术)?
您可以使用 Kafka 附带的 MirrorMaker(版本 1)。该工具主要用于将数据从一个数据中心复制到另一个数据中心。它建立在主题名称在两个集群中保持相同的假设之上。
但是,您可以提供自定义的 MessageHandler
重命名主题。
package org.xxx.java;
import java.util.Collections;
import java.util.List;
import kafka.consumer.BaseConsumerRecord;
import kafka.tools.MirrorMaker;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* An example implementation of MirrorMakerMessageHandler that allows to rename topic.
*/
public class TopicRenameHandler implements MirrorMaker.MirrorMakerMessageHandler {
private final String newName;
public TopicRenameHandler(String newName) {
this.newName = newName;
}
public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) {
return Collections.singletonList(new ProducerRecord<byte[], byte[]>(newName, record.partition(), record.key(), record.value()));
}
}
我在 pom.xml
文件中使用了以下依赖项
<properties>
<kafka.version>2.5.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
编译上面的代码并确保将 class 添加到 CLASSPATH
export CLASSPATH=$CLASSPATH:/.../target/MirrorMakerRenameTopics-1.0.jar
现在,连同一些基本的consumer.properties
bootstrap.servers=localhost:9092
client.id=mirror-maker-consumer
group.id=mirror-maker-rename-topic
auto.offset.reset=earliest
和producer.properties
bootstrap.servers=localhost:9092
client.id=mirror-maker-producer
你可以调用kafka-mirror-maker
如下
kafka-mirror-maker --consumer.config /path/to/consumer.properties \
--producer.config /path/to/producer.properties \
--num.streams 1 \
--whitelist="topicToBeRenamed" \
--message.handler org.xxx.java.TopicRenameHandler \
--message.handler.args "newTopicName"
使用此方法请注意以下两个注意事项:
- 由于您计划更改分区数,因此新主题中的消息顺序可能与旧主题不同。默认情况下,消息在 Kafka 中按密钥进行分区。
- 使用 MirrorMaker 不会复制旧主题中的现有偏移量,而是开始编写新的偏移量。因此,旧主题的偏移量与新主题的偏移量之间(几乎)没有任何关系。
我正在寻找一种在两个 Kafka 主题之间分发消息的方法。在原始主题中,我有 20 个分区,每个分区有 1000000 条消息。我想要一个包含 1000 个分区的新主题,并在新的更宽的分区范围内传播消息。
1T -> 20P -> 1000000 messages per partition (total 20m/topic)
2T -> 1000P -> 20000 messages per partition (total 20m/topic)
是否可以在 Kafka 中做到这一点(通过主题镜像或其他技术)?
您可以使用 Kafka 附带的 MirrorMaker(版本 1)。该工具主要用于将数据从一个数据中心复制到另一个数据中心。它建立在主题名称在两个集群中保持相同的假设之上。
但是,您可以提供自定义的 MessageHandler
重命名主题。
package org.xxx.java;
import java.util.Collections;
import java.util.List;
import kafka.consumer.BaseConsumerRecord;
import kafka.tools.MirrorMaker;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* An example implementation of MirrorMakerMessageHandler that allows to rename topic.
*/
public class TopicRenameHandler implements MirrorMaker.MirrorMakerMessageHandler {
private final String newName;
public TopicRenameHandler(String newName) {
this.newName = newName;
}
public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) {
return Collections.singletonList(new ProducerRecord<byte[], byte[]>(newName, record.partition(), record.key(), record.value()));
}
}
我在 pom.xml
文件中使用了以下依赖项
<properties>
<kafka.version>2.5.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
编译上面的代码并确保将 class 添加到 CLASSPATH
export CLASSPATH=$CLASSPATH:/.../target/MirrorMakerRenameTopics-1.0.jar
现在,连同一些基本的consumer.properties
bootstrap.servers=localhost:9092
client.id=mirror-maker-consumer
group.id=mirror-maker-rename-topic
auto.offset.reset=earliest
和producer.properties
bootstrap.servers=localhost:9092
client.id=mirror-maker-producer
你可以调用kafka-mirror-maker
如下
kafka-mirror-maker --consumer.config /path/to/consumer.properties \
--producer.config /path/to/producer.properties \
--num.streams 1 \
--whitelist="topicToBeRenamed" \
--message.handler org.xxx.java.TopicRenameHandler \
--message.handler.args "newTopicName"
使用此方法请注意以下两个注意事项:
- 由于您计划更改分区数,因此新主题中的消息顺序可能与旧主题不同。默认情况下,消息在 Kafka 中按密钥进行分区。
- 使用 MirrorMaker 不会复制旧主题中的现有偏移量,而是开始编写新的偏移量。因此,旧主题的偏移量与新主题的偏移量之间(几乎)没有任何关系。