Flume:使用 Kafka 通道将事件路由到适当的主题分区
Flume: Routing events to the proper topic partition with Kafka channel
在 Flume 中,当使用 Kafka 通道时,有没有办法影响将事件发送到哪个分区?
使用 Kafka sink,key
FlumeEvent header 显然用于选择分区,但我找不到任何文档关于 Kafka channel.
的分区
渠道不用担心分区问题。因为 channels 是写的,而 channel 是消费消息的,所以不需要对消息进行分区。这就是 flume-kafka-channel 编写消息的方式。
new KeyedMessage<String, byte[]>(topic.get(), null,
batchUUID, event)
但是,如果您的主题有多个分区,则缺少密钥会导致消息被喷射到可用分区中。
如果你想更多地控制消息在分区中的分布方式,那么你可能想研究 Kafka 的自定义分区程序概念,这样你就可以创建一个 class 实现 org.apache.kafka.clients.producer.Partitioner
接口,并且将 partitioner.class 属性 的值设置为 class 的名称,并确保自定义分区程序在 class 路径中可用。这样你就可以在发布之前控制每条消息,你可以决定消息应该去哪个分区。您可以在 flume 频道配置中设置 属性 kafka.partitioner.class
以便它被拾取
Flume 的 Kafka 通道不支持像 KafkaSink 那样将事件 header 映射到分区键 out-of-the-box。
但是,对其进行修改并不太复杂。由于我不确定我是否可以分享代码,我只会给出指示:
- 为 header 的名称添加一个配置键,它将映射到分区键
- 在内部 class KafkaTransaction 中,将成员类型
serializedEvents
中的 byte[]
替换为还可以为每个事件保存 String
键的内容(或者一个内在的 class,甚至是一个 Kafka KeyedMessage<String, byte[]>
)
- 在方法
KafkaTransaction.doPut(Event event)
中,从 header 中检索密钥并与序列化消息一起存储在 serializedEvents
中
- 在方法
KafkaTransaction.doCommit()
中,使用与序列化事件一起存储的密钥而不是 batchUUID
。
注意事务中的事件将不再保证由通道消费者端的单个 KafkaChannel 实例处理,因此您必须检查它与您的用例兼容(关于交易大小等)。
在 Flume 中,当使用 Kafka 通道时,有没有办法影响将事件发送到哪个分区?
使用 Kafka sink,key
FlumeEvent header 显然用于选择分区,但我找不到任何文档关于 Kafka channel.
渠道不用担心分区问题。因为 channels 是写的,而 channel 是消费消息的,所以不需要对消息进行分区。这就是 flume-kafka-channel 编写消息的方式。
new KeyedMessage<String, byte[]>(topic.get(), null,
batchUUID, event)
但是,如果您的主题有多个分区,则缺少密钥会导致消息被喷射到可用分区中。
如果你想更多地控制消息在分区中的分布方式,那么你可能想研究 Kafka 的自定义分区程序概念,这样你就可以创建一个 class 实现 org.apache.kafka.clients.producer.Partitioner
接口,并且将 partitioner.class 属性 的值设置为 class 的名称,并确保自定义分区程序在 class 路径中可用。这样你就可以在发布之前控制每条消息,你可以决定消息应该去哪个分区。您可以在 flume 频道配置中设置 属性 kafka.partitioner.class
以便它被拾取
Flume 的 Kafka 通道不支持像 KafkaSink 那样将事件 header 映射到分区键 out-of-the-box。
但是,对其进行修改并不太复杂。由于我不确定我是否可以分享代码,我只会给出指示:
- 为 header 的名称添加一个配置键,它将映射到分区键
- 在内部 class KafkaTransaction 中,将成员类型
serializedEvents
中的byte[]
替换为还可以为每个事件保存String
键的内容(或者一个内在的 class,甚至是一个 KafkaKeyedMessage<String, byte[]>
) - 在方法
KafkaTransaction.doPut(Event event)
中,从 header 中检索密钥并与序列化消息一起存储在serializedEvents
中 - 在方法
KafkaTransaction.doCommit()
中,使用与序列化事件一起存储的密钥而不是batchUUID
。
注意事务中的事件将不再保证由通道消费者端的单个 KafkaChannel 实例处理,因此您必须检查它与您的用例兼容(关于交易大小等)。