如果我提供自定义分区器,KafkaTemplate 的 send(Topic, Key, Message) 方法会调用分区方法吗?

Will send(Topic, Key, Message) method of KafkaTemplate calls Partition method if I provide custom Partitioner?

我想知道 kafkaTemplate.send(topic, key, message) 方法是否会调用提供的自定义分区程序 partition() 方法?

嗯,KafkaTemplate 完全基于 Apache Kafka Client Producer。最终代码如下所示:

producer.send(producerRecord, buildCallback(producerRecord, producer, future));

keytopic 确实是 ProducerRecord 的一部分。 其他一切都在 KafkaProducer 和其他 Kafka Client 对象中完成。 特别是 Partitioner.partition() 是从这里调用的 KafkaProducer:

 /**
     * computes partition for given record.
     * if the record has partition returns the value otherwise
     * calls configured partitioner class to compute the partition.
     */
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

而这个是 private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { 的一部分,它是从提到的 KafkaTemplate.doSend() 中调用的。

您可以考虑改用这个 API:

/**
 * Send the data to the provided topic with the provided key and partition.
 * @param topic the topic.
 * @param partition the partition.
 * @param timestamp the timestamp of the record.
 * @param key the key.
 * @param data the data.
 * @return a Future for the {@link SendResult}.
 * @since 1.3
 */
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

因此,您也可以完全控制 partition 的代码。

但很快:KafkaTemplate 不会调用 Partitioner.partition()

另一方面:为什么不在您的项目中尝试呢?你可能还没有带着这样的问题来找我们...