
Executing multiple machines in parallel in thread safe manner

我有一个主机名列表,我正在使用 ExecutorService 并行执行它以收集每个主机名的所有指标。然后我正在制作一个列表,其中包含所有主机名的所有指标相关信息,方法是未来迭代每个主机名。由于我并行执行多个主机名,所以我不确定这段代码是否线程安全。

这是我并行执行多个 HOSTNAMES 的主要代码:

final Flows typeOfFlow = Flows.TREE;

List<Future<MachineMetrics>> machineFutureList = new ArrayList<>();
for (final String machine : HOSTNAMES) {
    machineFutureList.add(executorService.submit(new Callable<MachineMetrics>() {
        public MachineMetrics call() throws Exception {
            MachineMetrics machineMetrics = new MachineMetrics();
            String url = "http://" + machine + ":8080/text";
            Map<String, String> metrics = getMetrics(machine, url, typeOfFlow);
            return machineMetrics;
List<MachineMetrics> metricsList = new ArrayList<>();
for (Future<MachineMetrics> future : machineFutureList) {
    try {
    } catch (InterruptedException | ExecutionException ex) {
        // log exception here
// now print all the hostnames metrics information

下面是我的 getMetrics 代码,在上面的代码所在的 class 中:

private Map<String, String> getMetrics(final String machine, final String url, final Flows flowType) {
    Map<String, String> holder = new HashMap<String, String>();
    try {
        RestTemplate restTemplate = RestTemplateClient.getInstance().getClient();
        String response = restTemplate.getForObject(url, String.class);
        Matcher m = PATTERN.matcher(response);
        while (m.find()) {
            String key = m.group(1).trim();
            String value = m.group(2).trim();
            holder.put(key, value);
    } catch (Exception ex) {
        // log here

    return TestUtils.process(holder);

下面是我在 TestUtils class 中的 findDatacenter 代码:

public static Optional<Datacenter> findDatacenter(final String hostname) {
    if (!TestUtils.isEmpty(hostname)) {
        for (Datacenter dc : DC_LIST) {
            String namepart = "." + dc.name().toLowerCase() + ".";
            if (hostname.indexOf(namepart) >= 0) {
                return Optional.of(dc);
    return Optional.absent();

下面是我在 TestUtils class 中的 process 方法:

public static Map<String, String> process(final Map<String, String> holder) {
    Map<String, String> tempMap = new HashMap<>();

    for (Map.Entry<String, String> entry : holder.entrySet()) {
        if (!entry.getKey().startsWith("calls_") && !entry.getValue().contains("|")) {
        String currentKey = entry.getKey();
        String currentValue = entry.getValue();
        StringTokenizer tokenizer = new StringTokenizer(currentValue, "|");

        String count = tokenizer.nextToken().trim();
        String avgData = tokenizer.nextToken().trim();
        String medianData = tokenizer.nextToken().trim();
        String n95data = tokenizer.nextToken().trim();
        String n99data = tokenizer.nextToken().trim();

        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), COUNT), count);
        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), AVG_IN_MS), avgData);
        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), MEDIAN_IN_MS), medianData);
        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), N95_IN_MS), n95data);
        tempMap.put(generateKey(currentKey, currentKey.contains(MISS), N99_IN_MS), n99data);



    return tempMap;

下面是我在 TestUtils class 中的 generateKey 方法:

private static String generateKey(final String currentKey, final boolean hasMiss, final String constant) {
    StringBuilder newKey = new StringBuilder();

    if (hasMiss) {
    } else {
        String firstPart = currentKey.substring(0, currentKey.indexOf("_"));
        String secondPart = currentKey.substring(currentKey.lastIndexOf("_") + 1, currentKey.length());

    return newKey.toString();

下面是我的 MachineMetrics class:

public class MachineMetrics {

    private String machineName;
    private String datacenter;
    private Map<String, String> metrics;

    // normal setters and getters here


看起来不错。您的方法是 stateless. Also you use immutable objects 作为方法参数。所以你不会有线程安全问题。


for (Future<MachineMetrics> future : machineFutureList) {
    try {
    } catch (InterruptedException | ExecutionException ex) {
        // log exception here

get Waits if necessary for the computation to complete, and then retrieves its result. So if first call was slow, you will not retrieve other results. Use isDone 检查您是否可以在不等待的情况下调用 get