具有 spring kafka 模板工厂实现的 Kafka 生产者
Kafka producer with spring kafka template factory implementation
我有一个简单的休息 api(方法 1),它使用 kafka-clients api 生成发送到 kafka 集群的消息。
Spring boot rest -> producer.send (kafka-clients lib) -> kafka cluster
此外,我还有另一种实现方式(方法 2)
Spring boot rest -> producer factory implementation (single configuration spring object) -> kafka template send (spring-kafka) -> kafka cluster
我观察到方法 2 比方法 1 花费更多时间。例如,方法 1 处理一条消息花费了 40 毫秒,方法 2 花费了将近 100 毫秒。
我想使用基于生产者工厂的实现来最大程度地减少推送消息所花费的时间。关于如何调整它有什么想法吗?
实施细节如下:(生产者工厂)
@Configuration
public class KafkaConfig {
@Value("${bootstrap.servers}")
String bootStrapServers;
@Bean
public Map<String,Object> configs(){
Map<String, Object> properties = new HashMap<String, Object>();
properties.put("bootstrap.servers", bootStrapServers);
properties.put("acks", "0");
properties.put("retries", 0);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
@Bean
public ProducerFactory<String,String> factory(){
return new DefaultKafkaProducerFactory<>(configs());
}
@Bean
public KafkaTemplate<String,String> template(){
return new KafkaTemplate<>(factory());
}
}
Controller :
@Autowired
private KafkaTemplate<String,String> template;
public ResponseEntity<String> producer(@PathVariable String topicName, @RequestBody String requestBody) throws JsonProcessingException {
try {
template.send(topicName,requestBody);
} catch (Exception ex) {
logger.error(ex);
} finally {
}
return ResponseEntity.ok().build();
}
我确实看到了比我预期的更多的开销(类似于您的结果)。我会做一些分析,看看它是否可以改进。
框架总是会增加一些开销,但底线是,与所有 Spring 项目一样,如果需要,您仍然可以下降到较低级别的 API。
@SpringBootApplication
public class So65791199Application {
public static void main(String[] args) {
SpringApplication.run(So65791199Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
ProducerFactory<String, String> pf) {
return args -> {
StopWatch watch = new StopWatch();
ListenableFuture<SendResult<String, String>> future = template.send("so65791199", "foo");
future.get(10, TimeUnit.SECONDS);
List<ListenableFuture<SendResult<String, String>>> futures = new LinkedList<>();
watch.start("template");
IntStream.range(0, 10000).forEach(i -> {
futures.add(template.send("so65791199", "foo"));
});
for (ListenableFuture<SendResult<String, String>> fut : futures) {
fut.get(10, TimeUnit.SECONDS);
}
watch.stop();
Producer<String, String> producer = new KafkaProducer<>(pf.getConfigurationProperties());
ProducerRecord<String, String> pr = new ProducerRecord<>("so65791199", 0, null, "foo");
Future<RecordMetadata> fut = producer.send(pr);
fut.get(10, TimeUnit.SECONDS);
watch.start("raw producer");
List<Future<RecordMetadata>> futs = new LinkedList<>();
IntStream.range(0, 10000).forEach(i -> {
futs.add(producer.send(new ProducerRecord<>("so65791199", 0, null, "foo")));
});
for (Future<RecordMetadata> futr : futs) {
futr.get(10, TimeUnit.SECONDS);
}
watch.stop();
producer.close();
System.out.println(watch.prettyPrint());
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so65791199").partitions(1).replicas(1).build();
}
}
StopWatch '': running time = 126595537 ns
---------------------------------------------
ns % Task name
---------------------------------------------
088742103 070% template
037853434 030% raw producer
我有一个简单的休息 api(方法 1),它使用 kafka-clients api 生成发送到 kafka 集群的消息。
Spring boot rest -> producer.send (kafka-clients lib) -> kafka cluster
此外,我还有另一种实现方式(方法 2)
Spring boot rest -> producer factory implementation (single configuration spring object) -> kafka template send (spring-kafka) -> kafka cluster
我观察到方法 2 比方法 1 花费更多时间。例如,方法 1 处理一条消息花费了 40 毫秒,方法 2 花费了将近 100 毫秒。
我想使用基于生产者工厂的实现来最大程度地减少推送消息所花费的时间。关于如何调整它有什么想法吗?
实施细节如下:(生产者工厂)
@Configuration
public class KafkaConfig {
@Value("${bootstrap.servers}")
String bootStrapServers;
@Bean
public Map<String,Object> configs(){
Map<String, Object> properties = new HashMap<String, Object>();
properties.put("bootstrap.servers", bootStrapServers);
properties.put("acks", "0");
properties.put("retries", 0);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
@Bean
public ProducerFactory<String,String> factory(){
return new DefaultKafkaProducerFactory<>(configs());
}
@Bean
public KafkaTemplate<String,String> template(){
return new KafkaTemplate<>(factory());
}
}
Controller :
@Autowired
private KafkaTemplate<String,String> template;
public ResponseEntity<String> producer(@PathVariable String topicName, @RequestBody String requestBody) throws JsonProcessingException {
try {
template.send(topicName,requestBody);
} catch (Exception ex) {
logger.error(ex);
} finally {
}
return ResponseEntity.ok().build();
}
我确实看到了比我预期的更多的开销(类似于您的结果)。我会做一些分析,看看它是否可以改进。
框架总是会增加一些开销,但底线是,与所有 Spring 项目一样,如果需要,您仍然可以下降到较低级别的 API。
@SpringBootApplication
public class So65791199Application {
public static void main(String[] args) {
SpringApplication.run(So65791199Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
ProducerFactory<String, String> pf) {
return args -> {
StopWatch watch = new StopWatch();
ListenableFuture<SendResult<String, String>> future = template.send("so65791199", "foo");
future.get(10, TimeUnit.SECONDS);
List<ListenableFuture<SendResult<String, String>>> futures = new LinkedList<>();
watch.start("template");
IntStream.range(0, 10000).forEach(i -> {
futures.add(template.send("so65791199", "foo"));
});
for (ListenableFuture<SendResult<String, String>> fut : futures) {
fut.get(10, TimeUnit.SECONDS);
}
watch.stop();
Producer<String, String> producer = new KafkaProducer<>(pf.getConfigurationProperties());
ProducerRecord<String, String> pr = new ProducerRecord<>("so65791199", 0, null, "foo");
Future<RecordMetadata> fut = producer.send(pr);
fut.get(10, TimeUnit.SECONDS);
watch.start("raw producer");
List<Future<RecordMetadata>> futs = new LinkedList<>();
IntStream.range(0, 10000).forEach(i -> {
futs.add(producer.send(new ProducerRecord<>("so65791199", 0, null, "foo")));
});
for (Future<RecordMetadata> futr : futs) {
futr.get(10, TimeUnit.SECONDS);
}
watch.stop();
producer.close();
System.out.println(watch.prettyPrint());
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so65791199").partitions(1).replicas(1).build();
}
}
StopWatch '': running time = 126595537 ns
---------------------------------------------
ns % Task name
---------------------------------------------
088742103 070% template
037853434 030% raw producer