如何在多线程应用程序连续失败后将主机名添加到阻止列表?
How to add hostname to block list after consecutive failures in multithreading application?
我在我的代码中使用 Callable,它将被多个线程调用,如下所示。截至目前,每当抛出任何 RestClientException
时,我都会将 hostname
添加到 blockList.
public class Task implements Callable<DataResponse> {
private DataKey key;
private RestTemplate restTemplate;
public Task(DataKey key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() {
ResponseEntity<String> response = null;
// construct what are the hostnames I can call basis on user id
List<String> hostnames = some_code_here;
for (String hostname : hostnames) {
// If host name is null or host name is in block list, skip sending request to this host
if (DataUtils.isEmpty(hostname) || DataMapping.isBlocked(hostname)) {
continue;
}
try {
String url = createURL(hostname);
response = restTemplate.exchange(url, HttpMethod.GET, key.getEntity(), String.class);
// some code here to return the response if successful
} catch (HttpClientErrorException ex) {
// log exception
return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
} catch (HttpServerErrorException ex) {
// log exception
return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
} catch (RestClientException ex) {
// I don't want to add it to block list instantly.
// If same hostname as failed five times consecutively, then only add it
DataMapping.blockHost(hostname);
}
}
return new DataResponse(DataErrorEnum.SERVER_UNAVAILABLE, DataStatusEnum.ERROR);
}
}
下面是我在 DataMapping
class:
private static final AtomicReference<ConcurrentHashMap<String, String>> blockedHosts =
new AtomicReference<ConcurrentHashMap<String, String>>(new ConcurrentHashMap<String, String>());
public static boolean isBlocked(String hostName) {
return blockedHosts.get().containsKey(hostName);
}
public static void blockHost(String hostName) {
blockedHosts.get().put(hostName, hostName);
}
问题陈述:-
现在,正如您在 call
方法中看到的那样,我在 hostname
抛出 RestClientException
时立即阻止它,这可能是不正确的。我需要查看某个特定的 hostname
是否已连续五次抛出 RestClientException
,然后仅通过调用此行 DataMapping.blockHost(hostname);
将此 hostname
添加到 blockList,否则不要添加它到阻止列表。
最有效和最好的方法是什么?最多,我总共会有70-100台独特的机器。
在这种情况下,我的调用方法将从多个线程调用,因此我需要确保我正确地为每个 hostname
保持计数,以防它们抛出 RestClientException
.
编辑:
我在 DataMapping
class 中也有以下方法:
我有一个后台线程,每 2 分钟运行一次,它会替换整个集合,因为我的服务提供真实数据,无论是否真的阻止了任何主机名。我想我确实需要 atomic reference
来替换整套。
我也在代码中本地添加阻止功能,因为我可能会在 2 分钟后知道哪台机器被阻止,所以如果可能的话最好事先知道。
// this is being updated from my background thread which runs every 2 minutes
public static void replaceBlockedHosts(List<String> hostNames) {
ConcurrentHashMap<String, String> newBlockedHosts = new ConcurrentHashMap<>();
for (String hostName : hostNames) {
newBlockedHosts.put(hostName, hostName);
}
blockedHosts.set(newBlockedHosts);
}
我会将每个主机与一个 AtomicInteger
相关联,该 AtomicInteger
在每个 RestClientException
上递增。在成功调用以强制执行 "five consecutive times" 约束时,此整数将设置为零。代码看起来像这样。
private final ConcurrentHashMap<String, AtomicInteger> failedCallCount = new ConcurrentHashMap<>();
void call() {
try {
String url = createURL(host);
// make rest call
resetFailedCallCount(host);
// ...
} catch (RestClientException ex) {
registerFailedCall(host);
if (shouldBeBlocked(host)) {
DataMapping.blockHost(host);
}
}
}
private boolean shouldBeBlocked(String hostName) {
AtomicInteger count = failedCallCount.getOrDefault(hostName, new AtomicInteger());
return count.get() >= 5;
}
private void registerFailedCall(String hostName) {
AtomicInteger newValue = new AtomicInteger();
AtomicInteger val = failedCallCount.putIfAbsent(hostName, newValue);
if (val == null) {
val = newValue;
}
if (val.get() < 5) {
val.incrementAndGet();
}
}
private void resetFailedCallCount(String hostName) {
AtomicInteger count = failedCallCount.get(hostName);
if (count != null) {
count.set(0);
}
}
这是无锁的(至少在我们自己的代码中)并且非常高效。但是它容易受到某些竞争条件的影响。最值得注意的是计数可以变得大于 5。但是,这应该不是问题,因为主机无论如何都被阻止并且计数不用于其他任何事情。
在您的 DataMapping
class 中维护一个静态寄存器,例如 - public static ConcurrentHashMap<String, Integer> toBeBlockedHostName = new ConcurrentHashMap<String, Integer>();
。然后像这样使用它你的 FOR 循环:
for (String hostname : hostnames) {
// .. some code here
//After ensuring everything is success and no RestClientException, i.e. can be last line of your TRY block...
DataMapping.toBeBlockedHostName.remove("Whosebug6361");
catch (RestClientException ex) {
if(DataMapping.toBeBlockedHostName.get("Whosebug6361") == null){
DataMapping.toBeBlockedHostName.put("Whosebug6361", new Integer(1));
} else{
if(DataMapping.toBeBlockedHostName.get("Whosebug6361") == 5){ //Don't hard code 5, have it from some property file after defining as retryThreshold...
System.out.println("Blocking threshold reached, block the hostname...");
DataMapping.blockHost(hostname);
} else{
DataMapping.toBeBlockedHostName.put("Whosebug6361", (toBeBlockedHostName.get("Whosebug6361") + 1));
}
}
}
请注意::对于ConcurrentHashMap
,即使所有操作都是线程安全的,检索操作也不需要锁定。
请注意,连续5次重试失败后,您将屏蔽主机名,但如果您再次解除屏蔽,则应清除寄存器。
P.S.: HashMap有合适的getter和setter.
我在我的代码中使用 Callable,它将被多个线程调用,如下所示。截至目前,每当抛出任何 RestClientException
时,我都会将 hostname
添加到 blockList.
public class Task implements Callable<DataResponse> {
private DataKey key;
private RestTemplate restTemplate;
public Task(DataKey key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() {
ResponseEntity<String> response = null;
// construct what are the hostnames I can call basis on user id
List<String> hostnames = some_code_here;
for (String hostname : hostnames) {
// If host name is null or host name is in block list, skip sending request to this host
if (DataUtils.isEmpty(hostname) || DataMapping.isBlocked(hostname)) {
continue;
}
try {
String url = createURL(hostname);
response = restTemplate.exchange(url, HttpMethod.GET, key.getEntity(), String.class);
// some code here to return the response if successful
} catch (HttpClientErrorException ex) {
// log exception
return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
} catch (HttpServerErrorException ex) {
// log exception
return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
} catch (RestClientException ex) {
// I don't want to add it to block list instantly.
// If same hostname as failed five times consecutively, then only add it
DataMapping.blockHost(hostname);
}
}
return new DataResponse(DataErrorEnum.SERVER_UNAVAILABLE, DataStatusEnum.ERROR);
}
}
下面是我在 DataMapping
class:
private static final AtomicReference<ConcurrentHashMap<String, String>> blockedHosts =
new AtomicReference<ConcurrentHashMap<String, String>>(new ConcurrentHashMap<String, String>());
public static boolean isBlocked(String hostName) {
return blockedHosts.get().containsKey(hostName);
}
public static void blockHost(String hostName) {
blockedHosts.get().put(hostName, hostName);
}
问题陈述:-
现在,正如您在 call
方法中看到的那样,我在 hostname
抛出 RestClientException
时立即阻止它,这可能是不正确的。我需要查看某个特定的 hostname
是否已连续五次抛出 RestClientException
,然后仅通过调用此行 DataMapping.blockHost(hostname);
将此 hostname
添加到 blockList,否则不要添加它到阻止列表。
最有效和最好的方法是什么?最多,我总共会有70-100台独特的机器。
在这种情况下,我的调用方法将从多个线程调用,因此我需要确保我正确地为每个 hostname
保持计数,以防它们抛出 RestClientException
.
编辑:
我在 DataMapping
class 中也有以下方法:
我有一个后台线程,每 2 分钟运行一次,它会替换整个集合,因为我的服务提供真实数据,无论是否真的阻止了任何主机名。我想我确实需要 atomic reference
来替换整套。
我也在代码中本地添加阻止功能,因为我可能会在 2 分钟后知道哪台机器被阻止,所以如果可能的话最好事先知道。
// this is being updated from my background thread which runs every 2 minutes
public static void replaceBlockedHosts(List<String> hostNames) {
ConcurrentHashMap<String, String> newBlockedHosts = new ConcurrentHashMap<>();
for (String hostName : hostNames) {
newBlockedHosts.put(hostName, hostName);
}
blockedHosts.set(newBlockedHosts);
}
我会将每个主机与一个 AtomicInteger
相关联,该 AtomicInteger
在每个 RestClientException
上递增。在成功调用以强制执行 "five consecutive times" 约束时,此整数将设置为零。代码看起来像这样。
private final ConcurrentHashMap<String, AtomicInteger> failedCallCount = new ConcurrentHashMap<>();
void call() {
try {
String url = createURL(host);
// make rest call
resetFailedCallCount(host);
// ...
} catch (RestClientException ex) {
registerFailedCall(host);
if (shouldBeBlocked(host)) {
DataMapping.blockHost(host);
}
}
}
private boolean shouldBeBlocked(String hostName) {
AtomicInteger count = failedCallCount.getOrDefault(hostName, new AtomicInteger());
return count.get() >= 5;
}
private void registerFailedCall(String hostName) {
AtomicInteger newValue = new AtomicInteger();
AtomicInteger val = failedCallCount.putIfAbsent(hostName, newValue);
if (val == null) {
val = newValue;
}
if (val.get() < 5) {
val.incrementAndGet();
}
}
private void resetFailedCallCount(String hostName) {
AtomicInteger count = failedCallCount.get(hostName);
if (count != null) {
count.set(0);
}
}
这是无锁的(至少在我们自己的代码中)并且非常高效。但是它容易受到某些竞争条件的影响。最值得注意的是计数可以变得大于 5。但是,这应该不是问题,因为主机无论如何都被阻止并且计数不用于其他任何事情。
在您的 DataMapping
class 中维护一个静态寄存器,例如 - public static ConcurrentHashMap<String, Integer> toBeBlockedHostName = new ConcurrentHashMap<String, Integer>();
。然后像这样使用它你的 FOR 循环:
for (String hostname : hostnames) {
// .. some code here
//After ensuring everything is success and no RestClientException, i.e. can be last line of your TRY block...
DataMapping.toBeBlockedHostName.remove("Whosebug6361");
catch (RestClientException ex) {
if(DataMapping.toBeBlockedHostName.get("Whosebug6361") == null){
DataMapping.toBeBlockedHostName.put("Whosebug6361", new Integer(1));
} else{
if(DataMapping.toBeBlockedHostName.get("Whosebug6361") == 5){ //Don't hard code 5, have it from some property file after defining as retryThreshold...
System.out.println("Blocking threshold reached, block the hostname...");
DataMapping.blockHost(hostname);
} else{
DataMapping.toBeBlockedHostName.put("Whosebug6361", (toBeBlockedHostName.get("Whosebug6361") + 1));
}
}
}
请注意::对于ConcurrentHashMap
,即使所有操作都是线程安全的,检索操作也不需要锁定。
请注意,连续5次重试失败后,您将屏蔽主机名,但如果您再次解除屏蔽,则应清除寄存器。
P.S.: HashMap有合适的getter和setter.