Kafka connect(单机)向多个分区写入数据

Kafka connect (standalone) writing data to multiple partitions

我正在尝试使用 Kafka connect 以独立模式写入数据。我正在写入数据的主题是有多个分区。但是,数据仅写入其中一个分区。当我启动多个消费者控制台时,数据只打印到其中一个。另一个消费者控制台只有在第一个关闭后才能获取任何数据。我无法弄清楚我需要在配置文件中进行哪些更改才能使其写入多个分区。

这是standalone.properties

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=1000
rest.port=8084

连接文件-source.properties:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test4.txt
topic=consumer_group

现在我正在使用以下命令 运行 连接器:

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

使用以下内容启动消费者控制台:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic consumer_group --from-beginning --consumer-property group.id=new-consumer-group

它只将数据打印到其中一个消费者控制台。但是,如果我使用生产者控制台而不是 Kafka 连接来写入消息,那么我可以在多个消费者(以循环方式)上看到消息,这是应该的方式。但是使用 Kafka connect,它只是将所有数据写入单个分区,同一组中的其他消费者必须闲置。需要更改什么才能使其写入循环系统中的所有分区?

此答案适用于 Apache Kafka 0.10.2.1,但不一定适用于未来版本。

如您所知,文件源连接器生成带有 null 键和 null 主题分区号的消息。这意味着 Kafka Connect 的生产者使用它的 partitioner 分配主题分区,对于具有空键的消息,default partitioner 将尝试将消息循环到可用 个分区。

但是,您 运行 陷入了 JSON 转换器的怪癖之一,它通过 key.convertervalue.converter 属性在 standalone.properties 文件中配置:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

当 JSON 转换器配置为启用模式时,JSON 表示包含值周围的信封,以便键或值同时包含 模式和有效负载:

{
    "schema": ...,
    "payload": ...
}

您的 standalone.properties 文件将密钥转换器配置为启用 模式 ,因此即使连接器使用 null 密钥和 null 生成消息模式,JSON 转换器(启用模式)总是将它们包装在一个信封中。因此,每条消息的密钥将是:

{
    "schema": null,
    "payload": null
}

生产者的默认分区器将总是将这些相同的键散列到相同的分区

要更改行为,请编辑您的 standalone.properties 文件并将 key.converter.schemas.enable 属性 更改为 false:

key.converter.schemas.enable=false

您可以选择将 value.converter.schemas.enable 属性 更改为 false 以更改 value 的写入方式以不将值包装在信封并包含架构:

value.converter.schemas.enable=false

这也会影响转换器如何处理 空值 ,某些连接器会在删除具有特定键的源实体时生成空值。例如,当从源数据库中删除一行时,一些更改数据捕获连接器会执行此操作。这对 log compacted topics 非常有效,因为每条消息都代表键控实体的最后已知状态,并且因为 null value 对应于 tombstone 记录告诉 Kafka 在该墓碑之前具有相同密钥的所有消息都可以从日志中删除。但是,如果将值转换器配置为启用模式的 JSON 转换器 将永远不会输出 null 消息值,因此日志压缩永远不会删除墓碑消息。这是一个小问题,但需要注意。

如果您想在 JSON 中对您的键和值进行编码,那么您可能不需要或不想要这些模式,因此可以为它们的键和值打开 schemas.enable JSON 个转换器。

对于那些真正使用模式的人,请考虑使用 Confluent's Schema Registry and the Avro Converters. Not only are the encoded messages significantly smaller (due to the Avro encoding rather than JSON string encoding), the encoded messages include the ID of the Avro schema and thus allow you to evolve your message schemas over time,而不必协调升级您的生产者和消费者以使用完全相同的模式。各种优点都有!