是否可以使用 MirrorMaker2 复制没有别名前缀的 kafka 主题

Is it possible to replicate kafka topics without alias prefix with MirrorMaker2

我正在尝试在 2 个集群之间设置复制,但不想更改主题名称。例如,如果我有一个名为 "some_topic" 的主题,它会自动复制到 "cluster1.some_topic",我很确定这可以完成,但还没有找到正确的配置来更改此

我当前的配置"mirrormaker2.properties"

# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties 

# specify any number of cluster aliases
clusters = cluster1, cluster2

# connection information for each cluster
cluster1.bootstrap.servers = host1:9092,host2:9092,host3:9092
cluster2.bootstrap.servers = rep_host1:9092,rep_host2:9092,rep_host3:9092

# enable and configure individual replication flows
cluster1->cluster2.enabled = true
cluster1->cluster2.topics = sometopic.*

# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5

供参考:

我能够使用此设置删除前缀:

"replication.policy.separator": ""
"source.cluster.alias": "",
"target.cluster.alias": "",

如果您的情况需要别名设置,我知道您应该使用其他复制策略 class。默认情况下使用 DefaultReplicationPolicy class (https://kafka.apache.org/24/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html)

我认为上面的回答不合适。

在Mirror Maker 2.0中,如果你想保持主题不被修改,你必须实现ReplicationPolicy。

你可以参考DefaultReplicationPolicy.class,然后覆盖formatRemoteTopic(),之后你必须删除sourceClusterAlias + separator。最后在mm2.properties

配置replication.policy.class

我定义了MigrationReplicationPolicy.class

replication.policy.class = org.apache.kafka.connect.mirror.MigrationReplicationPolicy

你应该看看MirrorClientConfig,class,我知道你会明白

为了 'disable' 主题前缀 并同时正确镜像主题属性 ,我必须提供一个定制的复制策略,它也覆盖了 topicSource 方法。否则非默认主题属性(例如,"cleanup.policy=compact")没有被镜像,即使在重新启动镜像制造商之后也是如此。

这是对我有用的完整程序:

  1. 将以下定制的复制策略编译打包成一个.jar文件(完整的源代码可以找到here):
public class PrefixlessReplicationPolicy extends DefaultReplicationPolicy {

  private static final Logger log = LoggerFactory.getLogger(PrefixlessReplicationPolicy.class);

  private String sourceClusterAlias;

  @Override
  public void configure(Map<String, ?> props) {
    super.configure(props);
    sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
    if (sourceClusterAlias == null) {
      String logMessage = String.format("Property %s not found", MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
      log.error(logMessage);
      throw new RuntimeException(logMessage);
    }
  }

  @Override
  public String formatRemoteTopic(String sourceClusterAlias, String topic) {
    return topic;
  }

  @Override
  public String topicSource(String topic) {
    return topic == null ? null : sourceClusterAlias;
  }

  @Override
  public String upstreamTopic(String topic) {
    return null;
  }
}
  1. 将 .jar 复制到 ${KAFKA_HOME/libs 目录
  2. 通过在 ${KAFKA_HOME}/config/mm2.properties:
  3. 中设置 replication.policy.class 属性 来配置 Mirror Maker 2 使用该复制策略
  replication.policy.class=ch.mawileo.kafka.mm2.PrefixlessReplicationPolicy

设法使用 Kafka ConfluentINC 连接器映像版本 5.4.2 推送复制 属性是:

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
target.cluster.alias= 
replication.factor=3
tasks.max=3
topics=.*
source.cluster.alias= 
target.cluster.bootstrap.servers=<broker1>,<broker2>,<broker3>
replication.policy.separator= 
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
source.cluster.bootstrap.servers=<broker1>,<broker2>,<broker3>
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

1) 在source.cluster.alias、replication.policy.separator、target.cluster.alias这3个参数后面留下space。

2) 在 TARGET Kafka 上设置此镜像连接器,而不是在源上(仅执行拉取)

此外,您还可以使用 Conductor 或 Kafka Connector UI landoop 映像 - landoop/kafka-connect-ui

这仍处于测试场景中,但看起来很有希望。

我正在尝试在 2 个集群之间设置复制,但需要在两个集群中使用相同的主题名称,而不需要在连接镜像中为 maker.properties.

提供别名

默认情况下,复制的主题根据源集群别名重命名。

    Source --> Target
    topic-1 --> source.topic-1

您可以通过在连接器属性文件下将以下属性设置为空白来避免主题被重命名。默认情况下,replication.policy.separator 属性 是句点,然后通过将其与 source.cluster.alias 一起设置为空白,目标主题将与源主题同名。

replication.policy.separator=
source.cluster.alias=
target.cluster.alias=

从Kafka 3.0.0开始,设置

就足够了
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy

此外,marcin-wieloch 的回答 中的 PrefixlessReplicationPolicy 不再适用于 3.0.0 (NullPointerException)。