如果我提供自定义分区器,KafkaTemplate 的 send(Topic, Key, Message) 方法会调用分区方法吗?
Will send(Topic, Key, Message) method of KafkaTemplate calls Partition method if I provide custom Partitioner?
我想知道 kafkaTemplate.send(topic, key, message) 方法是否会调用提供的自定义分区程序 partition() 方法?
嗯,KafkaTemplate
完全基于 Apache Kafka Client Producer
。最终代码如下所示:
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
key
和 topic
确实是 ProducerRecord
的一部分。
其他一切都在 KafkaProducer
和其他 Kafka Client 对象中完成。
特别是 Partitioner.partition()
是从这里调用的 KafkaProducer
:
/**
* computes partition for given record.
* if the record has partition returns the value otherwise
* calls configured partitioner class to compute the partition.
*/
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
而这个是 private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
的一部分,它是从提到的 KafkaTemplate.doSend()
中调用的。
您可以考虑改用这个 API:
/**
* Send the data to the provided topic with the provided key and partition.
* @param topic the topic.
* @param partition the partition.
* @param timestamp the timestamp of the record.
* @param key the key.
* @param data the data.
* @return a Future for the {@link SendResult}.
* @since 1.3
*/
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
因此,您也可以完全控制 partition
的代码。
但很快:KafkaTemplate
不会调用 Partitioner.partition()
。
另一方面:为什么不在您的项目中尝试呢?你可能还没有带着这样的问题来找我们...
我想知道 kafkaTemplate.send(topic, key, message) 方法是否会调用提供的自定义分区程序 partition() 方法?
嗯,KafkaTemplate
完全基于 Apache Kafka Client Producer
。最终代码如下所示:
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
key
和 topic
确实是 ProducerRecord
的一部分。
其他一切都在 KafkaProducer
和其他 Kafka Client 对象中完成。
特别是 Partitioner.partition()
是从这里调用的 KafkaProducer
:
/**
* computes partition for given record.
* if the record has partition returns the value otherwise
* calls configured partitioner class to compute the partition.
*/
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
而这个是 private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
的一部分,它是从提到的 KafkaTemplate.doSend()
中调用的。
您可以考虑改用这个 API:
/**
* Send the data to the provided topic with the provided key and partition.
* @param topic the topic.
* @param partition the partition.
* @param timestamp the timestamp of the record.
* @param key the key.
* @param data the data.
* @return a Future for the {@link SendResult}.
* @since 1.3
*/
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
因此,您也可以完全控制 partition
的代码。
但很快:KafkaTemplate
不会调用 Partitioner.partition()
。
另一方面:为什么不在您的项目中尝试呢?你可能还没有带着这样的问题来找我们...