为 kafkatemplate 异步响应创建的最大异步线程数是多少
What is the max number of async threads created for kafkatemplate async response
"A ForkJoinPool is constructed with a given target parallelism level; by default, equal to the number of available processors."
假设我的 CPU 有 2 个内核。那么,ForkJoinPool创建的最大线程数是4?
假设我正在执行一个异步操作,其中 returns 一个使用默认 Forkpool 的循环(比如 10k)操作中的未来对象...那么 Forkpool 将创建多少线程?
List<ListenableFuture<SendResult<String, String>>> cf = new ArrayList<ListenableFuture<SendResult<String, String>>>();
future = kafkaTemplate.send(topicName, message);
cf.add(future);
i++;
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
syso("sent success");
}
@Override
public void onFailure(Throwable ex) {
System.out.println(" sending failed");
}
});
并且,在其他一些 class 中,我正在检查所有未来是否已完成:
for (ListenableFuture<SendResult<String, String>> m : myFutures) {
m.get();
}
没有额外的线程;期货在生产者的 I/O 线程上完成。
这是一个显示回调的测试...
@SpringBootApplication
public class So61415751Application {
private static final Logger LOG = LoggerFactory.getLogger(So61415751Application.class);
public static void main(String[] args) {
SpringApplication.run(So61415751Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
template.setProducerListener(new ProducerListener<String, String>() {
@Override
public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
LOG.info(recordMetadata.toString());
}
});
return args -> {
IntStream.range(0, 9).forEach(i -> template.send("so61415751", "foo" + i));
LOG.info("Sent");
Thread.sleep(10_000);
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so61415751").partitions(1).replicas(1).build();
}
}
spring.kafka.producer.properties.linger.ms=3000
#logging.level.org.springframework.kafka=debug
logging.level.org.apache.kafka=debug
结果
2020-04-24 17:27:46.282 INFO 96084 --- [ main] com.example.demo.So61415751Application : Sent
...
3 second linger
...
2020-04-24 17:27:49.299 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@63
2020-04-24 17:27:49.300 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@64
2020-04-24 17:27:49.300 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@65
2020-04-24 17:27:49.300 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@66
2020-04-24 17:27:49.300 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@67
2020-04-24 17:27:49.300 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@68
2020-04-24 17:27:49.300 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@69
2020-04-24 17:27:49.301 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@70
2020-04-24 17:27:49.301 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@71
(调用ProducerListener
的线程也完成了future)。
"A ForkJoinPool is constructed with a given target parallelism level; by default, equal to the number of available processors."
假设我的 CPU 有 2 个内核。那么,ForkJoinPool创建的最大线程数是4?
假设我正在执行一个异步操作,其中 returns 一个使用默认 Forkpool 的循环(比如 10k)操作中的未来对象...那么 Forkpool 将创建多少线程?
List<ListenableFuture<SendResult<String, String>>> cf = new ArrayList<ListenableFuture<SendResult<String, String>>>();
future = kafkaTemplate.send(topicName, message);
cf.add(future);
i++;
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
syso("sent success");
}
@Override
public void onFailure(Throwable ex) {
System.out.println(" sending failed");
}
});
并且,在其他一些 class 中,我正在检查所有未来是否已完成:
for (ListenableFuture<SendResult<String, String>> m : myFutures) {
m.get();
}
没有额外的线程;期货在生产者的 I/O 线程上完成。
这是一个显示回调的测试...
@SpringBootApplication
public class So61415751Application {
private static final Logger LOG = LoggerFactory.getLogger(So61415751Application.class);
public static void main(String[] args) {
SpringApplication.run(So61415751Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
template.setProducerListener(new ProducerListener<String, String>() {
@Override
public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
LOG.info(recordMetadata.toString());
}
});
return args -> {
IntStream.range(0, 9).forEach(i -> template.send("so61415751", "foo" + i));
LOG.info("Sent");
Thread.sleep(10_000);
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so61415751").partitions(1).replicas(1).build();
}
}
spring.kafka.producer.properties.linger.ms=3000
#logging.level.org.springframework.kafka=debug
logging.level.org.apache.kafka=debug
结果
2020-04-24 17:27:46.282 INFO 96084 --- [ main] com.example.demo.So61415751Application : Sent
...
3 second linger
...
2020-04-24 17:27:49.299 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@63
2020-04-24 17:27:49.300 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@64
2020-04-24 17:27:49.300 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@65
2020-04-24 17:27:49.300 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@66
2020-04-24 17:27:49.300 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@67
2020-04-24 17:27:49.300 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@68
2020-04-24 17:27:49.300 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@69
2020-04-24 17:27:49.301 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@70
2020-04-24 17:27:49.301 INFO 96084 --- [ad | producer-1] com.example.demo.So61415751Application : so61415751-0@71
(调用ProducerListener
的线程也完成了future)。