Flume:使用 Kafka 通道将事件路由到适当的主题分区

Flume: Routing events to the proper topic partition with Kafka channel

在 Flume 中,当使用 Kafka 通道时,有没有办法影响将事件发送到哪个分区?

使用 Kafka sinkkey FlumeEvent header 显然用于选择分区,但我找不到任何文档关于 Kafka channel.

的分区

渠道不用担心分区问题。因为 channels 是写的,而 channel 是消费消息的,所以不需要对消息进行分区。这就是 flume-kafka-channel 编写消息的方式。

new KeyedMessage<String, byte[]>(topic.get(), null,
              batchUUID, event)

但是,如果您的主题有多个分区,则缺少密钥会导致消息被喷射到可用分区中。

如果你想更多地控制消息在分区中的分布方式,那么你可能想研究 Kafka 的自定义分区程序概念,这样你就可以创建一个 class 实现 org.apache.kafka.clients.producer.Partitioner 接口,并且将 partitioner.class 属性 的值设置为 class 的名称,并确保自定义分区程序在 class 路径中可用。这样你就可以在发布之前控制每条消息,你可以决定消息应该去哪个分区。您可以在 flume 频道配置中设置 属性 kafka.partitioner.class 以便它被拾取

Flume 的 Kafka 通道不支持像 KafkaSink 那样将事件 header 映射到分区键 out-of-the-box。

但是,对其进行修改并不太复杂。由于我不确定我是否可以分享代码,我只会给出指示:

  1. 为 header 的名称添加一个配置键,它将映射到分区键
  2. 在内部 class KafkaTransaction 中,将成员类型 serializedEvents 中的 byte[] 替换为还可以为每个事件保存 String 键的内容(或者一个内在的 class,甚至是一个 Kafka KeyedMessage<String, byte[]>)
  3. 在方法 KafkaTransaction.doPut(Event event) 中,从 header 中检索密钥并与序列化消息一起存储在 serializedEvents
  4. 在方法 KafkaTransaction.doCommit() 中,使用与序列化事件一起存储的密钥而不是 batchUUID

注意事务中的事件将不再保证由通道消费者端的单个 KafkaChannel 实例处理,因此您必须检查它与您的用例兼容(关于交易大小等)。