Java 中的多线程:字段未按预期递增
Multi-threading in Java: field not incrementing as expected
我在创建一个实例并使其在所有线程之间共享时遇到问题,这是我的代码:
这里是主要方法:
public static void main(String... args) throws IOException, ClassNotFoundException {
MainApp mainApp = new MainApp();
mainApp.init();
mainApp.multiThread();
}
这里是 init():
private void init() {
HttpClient httpClient = HttpClientBuilder.create()
.setMaxConnTotal(TOTAL_CONNECTION)
.setMaxConnPerRoute(PER_ROUTE)
.build();
final HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(httpClient);
restTemplate = new RestTemplate(requestFactory);
}
TOTAL_CONNECTION
是100,PER_ROUTE
是100
这里是多线程():
private void multiThread() {
MaterializationChecker materializationChecker = new MaterializationChecker(restTemplate, new TotalInfo(0, 0));
materializationChecker.check();
}
这是 TotalInfo class:
public class TotalInfo {
@Getter private int total;
@Getter private int aboveThreshold;
public TotalInfo(int total, int aboveThreshold) {
this.total = total;
this.aboveThreshold = aboveThreshold;
}
protected synchronized void increaseAboveThreshold() {
aboveThreshold++;
}
protected synchronized void increaseTotal() {
total++;
}
}
这里是materializationChecker.check()方法:(threadCount
设置为10,taskCount
设置为100)
public boolean check() {
try {
executor = Executors.newFixedThreadPool(threadCount);
completionService = new ExecutorCompletionService<>(executor);
submit(taskCount);
destroy();
System.out.println("Check finished -> OK !!!");
} catch (Exception e) {
System.out.println("exception when process - {}" + e);
}
return true;
}
private void submit(int taskCount) throws InterruptedException, ExecutionException {
for (int i = 0; i < taskCount; i++) {
completionService.submit(new MaterializationCallable(totalInfo));
}
int doneNum = 0;
MaterializationCallable materializationCallable;
Future<MaterializationCallable> future;
long averageLatencyOfAllAverages = 0L, minLatencyOfAllMins = Long.MAX_VALUE, maxLatencyOfAllMaxs = Long.MIN_VALUE;
while ((future = this.completionService.take()) != null) {
materializationCallable = future.get();
doneNum++;
System.out.println("Task " + doneNum + " done.");
averageLatencyOfAllAverages += materializationCallable.getLatencies().get(0);
minLatencyOfAllMins = Math.min(minLatencyOfAllMins, materializationCallable.getLatencies().get(1));
maxLatencyOfAllMaxs = Math.max(maxLatencyOfAllMaxs, materializationCallable.getLatencies().get(2));
if (doneNum >= taskCount) break;
}
System.out.println("----\naverageLatencyOfAllAverages = " + averageLatencyOfAllAverages/taskCount + " miiliseconds\nminLatencyOfAllMins = " + minLatencyOfAllMins
+ " ms\nmaxLatencyOfAllMaxs = " + maxLatencyOfAllMaxs + " ms");
System.out.println("total requests: " + totalInfo.getTotal() + ", total aboveThreshold: " + totalInfo.getAboveThreshold() + ", ratio (aboveThreshold/total): " + (totalInfo.getAboveThreshold()/totalInfo.getTotal()));
System.out.println("all tasks have been done.");
}
private void destroy() {
if (this.executor != null && !executor.isShutdown()) {
System.out.println("Shutdown and wait for all worker threads to be terminated.");
this.executor.shutdownNow();
while (!this.executor.isTerminated()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Occurred InterruptedException : {}" + e.getMessage());
}
System.out.println("Shutdown -> OK !!!");
}
}
}
这是 MaterializationCallable 的代码 class:
public class MaterializationCallable implements Callable<MaterializationCallable> {
public static final int DURATION = 30;
private final TotalInfo totalInfo;
@Getter private List<Long> latencies;
public MaterializationCallable(TotalInfo totalInfo) {
this.latencies = new ArrayList<>();
this.totalInfo = totalInfo;
}
@Override
public MaterializationCallable call() throws Exception {
long totalLatency = 0;
long maxLatency = Long.MIN_VALUE;
long minLatency = Long.MAX_VALUE;
totalInfo.increaseTotal();
for (int i = 0; i < itemIds.size(); i++){
restTemplate.getForObject(endpoint, byte[].class);
if (i != 0) {
long oneLatency = receiveLatency + desiralizeLatency;
totalLatency += oneLatency;
if (minLatency > oneLatency) {
minLatency = oneLatency;
}
if (maxLatency < oneLatency) {
maxLatency = oneLatency;
}
long threshold = TimeUnit.MILLISECONDS.toMillis(DURATION);
if (oneLatency > threshold) {
totalInfo.increaseAboveThreshold();
System.out.println("[] This request went over threshold: " + threshold + " ms, and took " + oneLatency + " ms to finish, its endpoint = " + endpoint);
}
}
}
latencies.add(average);
latencies.add(minLatency);
latencies.add(maxLatency);
System.out.println("Thread " + Thread.currentThread().getId() + " is done.");
return this;
}
}
我的问题是:
在materializationChecker.check()
方法的最后,totalInfo.getTotal()
只是100
而不是1000
,我已经初始化了10个线程池,并提交了100次任务,怎么会totalInfo
对象字段不是递增1000次?
出了什么问题?
请帮助我理解这一点。
非常感谢!
那是因为您只提交了 100
个任务。
您的代码旨在为每个提交的任务将 TotalInfo
值递增 1。您的执行程序具有 10
个线程这一事实与 TotalInfo
的值的计算方式无关。
10
线程仅允许执行器执行 10
并发任务,仅此而已。
我在创建一个实例并使其在所有线程之间共享时遇到问题,这是我的代码:
这里是主要方法:
public static void main(String... args) throws IOException, ClassNotFoundException {
MainApp mainApp = new MainApp();
mainApp.init();
mainApp.multiThread();
}
这里是 init():
private void init() {
HttpClient httpClient = HttpClientBuilder.create()
.setMaxConnTotal(TOTAL_CONNECTION)
.setMaxConnPerRoute(PER_ROUTE)
.build();
final HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(httpClient);
restTemplate = new RestTemplate(requestFactory);
}
TOTAL_CONNECTION
是100,PER_ROUTE
是100
这里是多线程():
private void multiThread() {
MaterializationChecker materializationChecker = new MaterializationChecker(restTemplate, new TotalInfo(0, 0));
materializationChecker.check();
}
这是 TotalInfo class:
public class TotalInfo {
@Getter private int total;
@Getter private int aboveThreshold;
public TotalInfo(int total, int aboveThreshold) {
this.total = total;
this.aboveThreshold = aboveThreshold;
}
protected synchronized void increaseAboveThreshold() {
aboveThreshold++;
}
protected synchronized void increaseTotal() {
total++;
}
}
这里是materializationChecker.check()方法:(threadCount
设置为10,taskCount
设置为100)
public boolean check() {
try {
executor = Executors.newFixedThreadPool(threadCount);
completionService = new ExecutorCompletionService<>(executor);
submit(taskCount);
destroy();
System.out.println("Check finished -> OK !!!");
} catch (Exception e) {
System.out.println("exception when process - {}" + e);
}
return true;
}
private void submit(int taskCount) throws InterruptedException, ExecutionException {
for (int i = 0; i < taskCount; i++) {
completionService.submit(new MaterializationCallable(totalInfo));
}
int doneNum = 0;
MaterializationCallable materializationCallable;
Future<MaterializationCallable> future;
long averageLatencyOfAllAverages = 0L, minLatencyOfAllMins = Long.MAX_VALUE, maxLatencyOfAllMaxs = Long.MIN_VALUE;
while ((future = this.completionService.take()) != null) {
materializationCallable = future.get();
doneNum++;
System.out.println("Task " + doneNum + " done.");
averageLatencyOfAllAverages += materializationCallable.getLatencies().get(0);
minLatencyOfAllMins = Math.min(minLatencyOfAllMins, materializationCallable.getLatencies().get(1));
maxLatencyOfAllMaxs = Math.max(maxLatencyOfAllMaxs, materializationCallable.getLatencies().get(2));
if (doneNum >= taskCount) break;
}
System.out.println("----\naverageLatencyOfAllAverages = " + averageLatencyOfAllAverages/taskCount + " miiliseconds\nminLatencyOfAllMins = " + minLatencyOfAllMins
+ " ms\nmaxLatencyOfAllMaxs = " + maxLatencyOfAllMaxs + " ms");
System.out.println("total requests: " + totalInfo.getTotal() + ", total aboveThreshold: " + totalInfo.getAboveThreshold() + ", ratio (aboveThreshold/total): " + (totalInfo.getAboveThreshold()/totalInfo.getTotal()));
System.out.println("all tasks have been done.");
}
private void destroy() {
if (this.executor != null && !executor.isShutdown()) {
System.out.println("Shutdown and wait for all worker threads to be terminated.");
this.executor.shutdownNow();
while (!this.executor.isTerminated()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Occurred InterruptedException : {}" + e.getMessage());
}
System.out.println("Shutdown -> OK !!!");
}
}
}
这是 MaterializationCallable 的代码 class:
public class MaterializationCallable implements Callable<MaterializationCallable> {
public static final int DURATION = 30;
private final TotalInfo totalInfo;
@Getter private List<Long> latencies;
public MaterializationCallable(TotalInfo totalInfo) {
this.latencies = new ArrayList<>();
this.totalInfo = totalInfo;
}
@Override
public MaterializationCallable call() throws Exception {
long totalLatency = 0;
long maxLatency = Long.MIN_VALUE;
long minLatency = Long.MAX_VALUE;
totalInfo.increaseTotal();
for (int i = 0; i < itemIds.size(); i++){
restTemplate.getForObject(endpoint, byte[].class);
if (i != 0) {
long oneLatency = receiveLatency + desiralizeLatency;
totalLatency += oneLatency;
if (minLatency > oneLatency) {
minLatency = oneLatency;
}
if (maxLatency < oneLatency) {
maxLatency = oneLatency;
}
long threshold = TimeUnit.MILLISECONDS.toMillis(DURATION);
if (oneLatency > threshold) {
totalInfo.increaseAboveThreshold();
System.out.println("[] This request went over threshold: " + threshold + " ms, and took " + oneLatency + " ms to finish, its endpoint = " + endpoint);
}
}
}
latencies.add(average);
latencies.add(minLatency);
latencies.add(maxLatency);
System.out.println("Thread " + Thread.currentThread().getId() + " is done.");
return this;
}
}
我的问题是:
在materializationChecker.check()
方法的最后,totalInfo.getTotal()
只是100
而不是1000
,我已经初始化了10个线程池,并提交了100次任务,怎么会totalInfo
对象字段不是递增1000次?
出了什么问题? 请帮助我理解这一点。
非常感谢!
那是因为您只提交了 100
个任务。
您的代码旨在为每个提交的任务将 TotalInfo
值递增 1。您的执行程序具有 10
个线程这一事实与 TotalInfo
的值的计算方式无关。
10
线程仅允许执行器执行 10
并发任务,仅此而已。