以线程安全方式并行执行多台机器

Executing multiple machines in parallel in thread safe manner

我有一个主机名列表,我正在使用 ExecutorService 并行执行它以收集每个主机名的所有指标。然后我正在制作一个列表,其中包含所有主机名的所有指标相关信息,方法是未来迭代每个主机名。由于我并行执行多个主机名,所以我不确定这段代码是否线程安全。

这是我并行执行多个 HOSTNAMES 的主要代码:

final Flows typeOfFlow = Flows.TREE;

List<Future<MachineMetrics>> machineFutureList = new ArrayList<>();
for (final String machine : HOSTNAMES) {
    machineFutureList.add(executorService.submit(new Callable<MachineMetrics>() {
        @Override
        public MachineMetrics call() throws Exception {
            MachineMetrics machineMetrics = new MachineMetrics();
            String url = "http://" + machine + ":8080/text";
            Map<String, String> metrics = getMetrics(machine, url, typeOfFlow);
            machineMetrics.setMachineName(machine.split("\.")[0]);
            machineMetrics.setDatacenter(TestUtils.findDatacenter(machine).get().name().toLowerCase());
            machineMetrics.setMetrics(metrics);
            return machineMetrics;
        }
    }));
}
List<MachineMetrics> metricsList = new ArrayList<>();
for (Future<MachineMetrics> future : machineFutureList) {
    try {
        metricsList.add(future.get());
    } catch (InterruptedException | ExecutionException ex) {
        // log exception here
    }
}
// now print all the hostnames metrics information
System.out.println(metricsList);

下面是我的 getMetrics 代码,在上面的代码所在的 class 中:

private Map<String, String> getMetrics(final String machine, final String url, final Flows flowType) {
    Map<String, String> holder = new HashMap<String, String>();
    try {
        RestTemplate restTemplate = RestTemplateClient.getInstance().getClient();
        String response = restTemplate.getForObject(url, String.class);
        Matcher m = PATTERN.matcher(response);
        while (m.find()) {
            String key = m.group(1).trim();
            String value = m.group(2).trim();
            holder.put(key, value);
        }
    } catch (Exception ex) {
        // log here
    }

    return TestUtils.process(holder);
}

下面是我在 TestUtils class 中的 findDatacenter 代码:

public static Optional<Datacenter> findDatacenter(final String hostname) {
    if (!TestUtils.isEmpty(hostname)) {
        for (Datacenter dc : DC_LIST) {
            String namepart = "." + dc.name().toLowerCase() + ".";
            if (hostname.indexOf(namepart) >= 0) {
                return Optional.of(dc);
            }
        }
    }
    return Optional.absent();
}

下面是我在 TestUtils class 中的 process 方法:

public static Map<String, String> process(final Map<String, String> holder) {
    Map<String, String> tempMap = new HashMap<>();

    for (Map.Entry<String, String> entry : holder.entrySet()) {
        if (!entry.getKey().startsWith("calls_") && !entry.getValue().contains("|")) {
            continue;
        }
        String currentKey = entry.getKey();
        String currentValue = entry.getValue();
        StringTokenizer tokenizer = new StringTokenizer(currentValue, "|");

        String count = tokenizer.nextToken().trim();
        String avgData = tokenizer.nextToken().trim();
        String medianData = tokenizer.nextToken().trim();
        String n95data = tokenizer.nextToken().trim();
        String n99data = tokenizer.nextToken().trim();

        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), COUNT), count);
        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), AVG_IN_MS), avgData);
        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), MEDIAN_IN_MS), medianData);
        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), N95_IN_MS), n95data);
        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), N99_IN_MS), n99data);

        holder.remove(currentKey);
    }

    tempMap.putAll(holder);

    return tempMap;
}

下面是我在 TestUtils class 中的 generateKey 方法:

private static String generateKey(final String currentKey, final boolean hasMiss, final String constant) {
    StringBuilder newKey = new StringBuilder();

    if (hasMiss) {
        newKey.append(currentKey).append(constant);
    } else {
        String firstPart = currentKey.substring(0, currentKey.indexOf("_"));
        String secondPart = currentKey.substring(currentKey.lastIndexOf("_") + 1, currentKey.length());
        newKey.append(firstPart).append(CACHE).append(secondPart).append(constant);
    }

    return newKey.toString();
}

下面是我的 MachineMetrics class:

public class MachineMetrics {

    private String machineName;
    private String datacenter;
    private Map<String, String> metrics;

    // normal setters and getters here
}

我上面的代码线程安全吗?由于某些竞争条件或线程安全问题,我可能会看到错误的结果吗?

看起来不错。您的方法是 stateless. Also you use immutable objects 作为方法参数。所以你不会有线程安全问题。

一个备注:

for (Future<MachineMetrics> future : machineFutureList) {
    try {
        metricsList.add(future.get());
    } catch (InterruptedException | ExecutionException ex) {
        // log exception here
    }
}

get Waits if necessary for the computation to complete, and then retrieves its result. So if first call was slow, you will not retrieve other results. Use isDone 检查您是否可以在不等待的情况下调用 get