以线程安全方式并行执行多台机器
Executing multiple machines in parallel in thread safe manner
我有一个主机名列表,我正在使用 ExecutorService
并行执行它以收集每个主机名的所有指标。然后我正在制作一个列表,其中包含所有主机名的所有指标相关信息,方法是未来迭代每个主机名。由于我并行执行多个主机名,所以我不确定这段代码是否线程安全。
这是我并行执行多个 HOSTNAMES
的主要代码:
final Flows typeOfFlow = Flows.TREE;
List<Future<MachineMetrics>> machineFutureList = new ArrayList<>();
for (final String machine : HOSTNAMES) {
machineFutureList.add(executorService.submit(new Callable<MachineMetrics>() {
@Override
public MachineMetrics call() throws Exception {
MachineMetrics machineMetrics = new MachineMetrics();
String url = "http://" + machine + ":8080/text";
Map<String, String> metrics = getMetrics(machine, url, typeOfFlow);
machineMetrics.setMachineName(machine.split("\.")[0]);
machineMetrics.setDatacenter(TestUtils.findDatacenter(machine).get().name().toLowerCase());
machineMetrics.setMetrics(metrics);
return machineMetrics;
}
}));
}
List<MachineMetrics> metricsList = new ArrayList<>();
for (Future<MachineMetrics> future : machineFutureList) {
try {
metricsList.add(future.get());
} catch (InterruptedException | ExecutionException ex) {
// log exception here
}
}
// now print all the hostnames metrics information
System.out.println(metricsList);
下面是我的 getMetrics
代码,在上面的代码所在的 class 中:
private Map<String, String> getMetrics(final String machine, final String url, final Flows flowType) {
Map<String, String> holder = new HashMap<String, String>();
try {
RestTemplate restTemplate = RestTemplateClient.getInstance().getClient();
String response = restTemplate.getForObject(url, String.class);
Matcher m = PATTERN.matcher(response);
while (m.find()) {
String key = m.group(1).trim();
String value = m.group(2).trim();
holder.put(key, value);
}
} catch (Exception ex) {
// log here
}
return TestUtils.process(holder);
}
下面是我在 TestUtils
class 中的 findDatacenter
代码:
public static Optional<Datacenter> findDatacenter(final String hostname) {
if (!TestUtils.isEmpty(hostname)) {
for (Datacenter dc : DC_LIST) {
String namepart = "." + dc.name().toLowerCase() + ".";
if (hostname.indexOf(namepart) >= 0) {
return Optional.of(dc);
}
}
}
return Optional.absent();
}
下面是我在 TestUtils
class 中的 process
方法:
public static Map<String, String> process(final Map<String, String> holder) {
Map<String, String> tempMap = new HashMap<>();
for (Map.Entry<String, String> entry : holder.entrySet()) {
if (!entry.getKey().startsWith("calls_") && !entry.getValue().contains("|")) {
continue;
}
String currentKey = entry.getKey();
String currentValue = entry.getValue();
StringTokenizer tokenizer = new StringTokenizer(currentValue, "|");
String count = tokenizer.nextToken().trim();
String avgData = tokenizer.nextToken().trim();
String medianData = tokenizer.nextToken().trim();
String n95data = tokenizer.nextToken().trim();
String n99data = tokenizer.nextToken().trim();
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), COUNT), count);
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), AVG_IN_MS), avgData);
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), MEDIAN_IN_MS), medianData);
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), N95_IN_MS), n95data);
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), N99_IN_MS), n99data);
holder.remove(currentKey);
}
tempMap.putAll(holder);
return tempMap;
}
下面是我在 TestUtils
class 中的 generateKey
方法:
private static String generateKey(final String currentKey, final boolean hasMiss, final String constant) {
StringBuilder newKey = new StringBuilder();
if (hasMiss) {
newKey.append(currentKey).append(constant);
} else {
String firstPart = currentKey.substring(0, currentKey.indexOf("_"));
String secondPart = currentKey.substring(currentKey.lastIndexOf("_") + 1, currentKey.length());
newKey.append(firstPart).append(CACHE).append(secondPart).append(constant);
}
return newKey.toString();
}
下面是我的 MachineMetrics
class:
public class MachineMetrics {
private String machineName;
private String datacenter;
private Map<String, String> metrics;
// normal setters and getters here
}
我上面的代码线程安全吗?由于某些竞争条件或线程安全问题,我可能会看到错误的结果吗?
看起来不错。您的方法是 stateless. Also you use immutable objects 作为方法参数。所以你不会有线程安全问题。
一个备注:
for (Future<MachineMetrics> future : machineFutureList) {
try {
metricsList.add(future.get());
} catch (InterruptedException | ExecutionException ex) {
// log exception here
}
}
get Waits if necessary for the computation to complete, and then retrieves its result.
So if first call was slow, you will not retrieve other results. Use isDone 检查您是否可以在不等待的情况下调用 get
。
我有一个主机名列表,我正在使用 ExecutorService
并行执行它以收集每个主机名的所有指标。然后我正在制作一个列表,其中包含所有主机名的所有指标相关信息,方法是未来迭代每个主机名。由于我并行执行多个主机名,所以我不确定这段代码是否线程安全。
这是我并行执行多个 HOSTNAMES
的主要代码:
final Flows typeOfFlow = Flows.TREE;
List<Future<MachineMetrics>> machineFutureList = new ArrayList<>();
for (final String machine : HOSTNAMES) {
machineFutureList.add(executorService.submit(new Callable<MachineMetrics>() {
@Override
public MachineMetrics call() throws Exception {
MachineMetrics machineMetrics = new MachineMetrics();
String url = "http://" + machine + ":8080/text";
Map<String, String> metrics = getMetrics(machine, url, typeOfFlow);
machineMetrics.setMachineName(machine.split("\.")[0]);
machineMetrics.setDatacenter(TestUtils.findDatacenter(machine).get().name().toLowerCase());
machineMetrics.setMetrics(metrics);
return machineMetrics;
}
}));
}
List<MachineMetrics> metricsList = new ArrayList<>();
for (Future<MachineMetrics> future : machineFutureList) {
try {
metricsList.add(future.get());
} catch (InterruptedException | ExecutionException ex) {
// log exception here
}
}
// now print all the hostnames metrics information
System.out.println(metricsList);
下面是我的 getMetrics
代码,在上面的代码所在的 class 中:
private Map<String, String> getMetrics(final String machine, final String url, final Flows flowType) {
Map<String, String> holder = new HashMap<String, String>();
try {
RestTemplate restTemplate = RestTemplateClient.getInstance().getClient();
String response = restTemplate.getForObject(url, String.class);
Matcher m = PATTERN.matcher(response);
while (m.find()) {
String key = m.group(1).trim();
String value = m.group(2).trim();
holder.put(key, value);
}
} catch (Exception ex) {
// log here
}
return TestUtils.process(holder);
}
下面是我在 TestUtils
class 中的 findDatacenter
代码:
public static Optional<Datacenter> findDatacenter(final String hostname) {
if (!TestUtils.isEmpty(hostname)) {
for (Datacenter dc : DC_LIST) {
String namepart = "." + dc.name().toLowerCase() + ".";
if (hostname.indexOf(namepart) >= 0) {
return Optional.of(dc);
}
}
}
return Optional.absent();
}
下面是我在 TestUtils
class 中的 process
方法:
public static Map<String, String> process(final Map<String, String> holder) {
Map<String, String> tempMap = new HashMap<>();
for (Map.Entry<String, String> entry : holder.entrySet()) {
if (!entry.getKey().startsWith("calls_") && !entry.getValue().contains("|")) {
continue;
}
String currentKey = entry.getKey();
String currentValue = entry.getValue();
StringTokenizer tokenizer = new StringTokenizer(currentValue, "|");
String count = tokenizer.nextToken().trim();
String avgData = tokenizer.nextToken().trim();
String medianData = tokenizer.nextToken().trim();
String n95data = tokenizer.nextToken().trim();
String n99data = tokenizer.nextToken().trim();
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), COUNT), count);
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), AVG_IN_MS), avgData);
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), MEDIAN_IN_MS), medianData);
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), N95_IN_MS), n95data);
tempMap.put(generateKey(currentKey, currentKey.contains(MISS), N99_IN_MS), n99data);
holder.remove(currentKey);
}
tempMap.putAll(holder);
return tempMap;
}
下面是我在 TestUtils
class 中的 generateKey
方法:
private static String generateKey(final String currentKey, final boolean hasMiss, final String constant) {
StringBuilder newKey = new StringBuilder();
if (hasMiss) {
newKey.append(currentKey).append(constant);
} else {
String firstPart = currentKey.substring(0, currentKey.indexOf("_"));
String secondPart = currentKey.substring(currentKey.lastIndexOf("_") + 1, currentKey.length());
newKey.append(firstPart).append(CACHE).append(secondPart).append(constant);
}
return newKey.toString();
}
下面是我的 MachineMetrics
class:
public class MachineMetrics {
private String machineName;
private String datacenter;
private Map<String, String> metrics;
// normal setters and getters here
}
我上面的代码线程安全吗?由于某些竞争条件或线程安全问题,我可能会看到错误的结果吗?
看起来不错。您的方法是 stateless. Also you use immutable objects 作为方法参数。所以你不会有线程安全问题。
一个备注:
for (Future<MachineMetrics> future : machineFutureList) {
try {
metricsList.add(future.get());
} catch (InterruptedException | ExecutionException ex) {
// log exception here
}
}
get Waits if necessary for the computation to complete, and then retrieves its result.
So if first call was slow, you will not retrieve other results. Use isDone 检查您是否可以在不等待的情况下调用 get
。