无法在回退方法中从父线程访问 InheritableThreadLocal 对象
Not able to access InheritableThreadLocal object from parent thread in fall back method
我有 InheritableThreadLocal<ConcurrentHashMap<String, Object>>
个线程,当请求通过过滤器并在其中设置一些 transaction_id 时初始化。
现在在服务层,我通过 CompletableFuture 调用了 10 个不同的 API 调用。所有 API 服务 class 都有一个 execute
方法使用 RestTempate 进行 API 调用。我把 @HystrixCommand
放在 execute
方法上。
execute 方法是 void 类型,但它将 API 响应放在 InheritableThreadLocal 对象中。
问题是当 API 调用 Hystrix 调用 FallBackMethod 失败时,当我将错误响应放入 InheritableThreadLocal 时,我无法将该错误响应发送给客户端。
ThreadLocalUtil.class
public class ThreadLocalUtil {
private static InheritableThreadLocal<ConcurrentHashMap<String, Object>> transmittableThreadLocal = new InheritableThreadLocal<>();
public static void addDataToThreadLocalMap(String key, Object value) {
Map<String, Object> existingDataMap = transmittableThreadLocal.get();
if (value != null) {
existingDataMap.put(key, value);
}
}
public static Object getDataFromThreadLocalMap(String key) {
Map<String, Object> existingDataMap = transmittableThreadLocal.get();
return existingDataMap.get(key);
}
public static void clearThreadLocalDataMap() {
if (transmittableThreadLocal != null)
transmittableThreadLocal.remove();
}
public static Object getRequestData(String key) {
Map<String, Object> existingDataMap = transmittableThreadLocal.get();
if (existingDataMap != null) {
return existingDataMap.get(key);
}
return "-1";
}
public static void initThreadLocals() {
ConcurrentHashMap<String, Object> dataForDataMap = new ConcurrentHashMap<String, Object>();
String requestId = "REQUEST_ID_" + System.currentTimeMillis();
dataForDataMap.put("REQUEST_ID", requestId);
transmittableThreadLocal.set(dataForDataMap);
}
}
CommonFilter.class
@Component
@Order(1)
public class CommonFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
try {
ThreadLocalUtil.initThreadLocals();
filterChain.doFilter(request, response);
} catch (Exception e) {
if (e instanceof ServletException) {
throw (ServletException) e;
}
} finally {
ThreadLocalUtil.clearThreadLocalDataMap();
}
}
EmployeeService.class
@Component
public abstract class EmployeeService {
@Autowired
private ThreadLocalUtil threadLocalUtil;
public abstract void getEmployee(int employeeId);
public void fallbackMethod(int employeeid) {
threadLocalUtil.addDataToThreadLocalMap("ErrorResponse", "Fallback response:: No employee details available temporarily");
}
}
EmployeeServiceImpl.class
@Service
public class EmployeeServiceImpl extends EmployeeService {
@HystrixCommand(fallbackMethod = "fallbackMethod", commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "900"),
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10") })
public void getEmployee(int employeeId) {
System.out.println("Getting Employee details for " + employeeId + ", threadLocalUtil : " + threadLocalUtil.getDataFromThreadLocalMap("EMPLOYE_ID"));
String response = restTemplate.exchange("http://localhost:8011/findEmployeeDetails/{employeeid}",
HttpMethod.GET, null, new ParameterizedTypeReference<String>() {
}, employeeId).getBody();
threadLocalUtil.addDataToThreadLocalMap("Response", response);
}
@Autowired
RestTemplate restTemplate;
@Autowired
private ThreadLocalUtil threadLocalUtil;
}
所以,首先因为内部Hystrix
使用ThreadPoolExecutor
(线程创建一次并重复使用),所以使用InheritableThreadLocal
是错误的。
从上面的问题和你在我的 blog 中提出的问题,我了解到你的问题是
InheritableThreadLocal
becomes null in hystrix fallback method
进一步添加(您可以验证)
InheritableThreadLocal
becomes null in hystrix fallback method only in case of timeouts and not in case of any other exception
我会推荐其他人参考我的blog。 Hystrix 回退 在超时的情况下,发生在 hystrix-timer 线程中。
Hystrix fallback execution thread
您可以通过记录 Thread.currentThread().getName()
来验证这一点
由于 hystrix-timer thread
的父线程不是您的调用线程,因此您的 transmittableThreadLocal.get() 变为空。
为了解决这个问题,我建议使用 HystrixCommandExecutionHook and HystrixRequestVariableDefault。使用这个你可以实现像 onStart, onExecutionStart, onFallbackStart
等的钩子,你需要在其中 get/set threadLocal 变量。更多详情可以参考博客最后一节
更新:
对于您的用例,您可以按如下方式修改代码:
ThreadLocalUtil.java
public class ThreadLocalUtil {
private static ThreadLocal<ConcurrentHashMap<String, Object>> transmittableThreadLocal = new ThreadLocal<>();
public static ConcurrentHashMap<String, Object> getThreadLocalData() {
return transmittableThreadLocal.get();
}
public static void setThreadLocalData(ConcurrentHashMap<String, Object> data) {
transmittableThreadLocal.set(data);
}
public static void addDataToThreadLocalMap(String key, Object value) {
Map<String, Object> existingDataMap = transmittableThreadLocal.get();
if (value != null) {
existingDataMap.put(key, value);
}
}
public static Object getDataFromThreadLocalMap(String key) {
Map<String, Object> existingDataMap = transmittableThreadLocal.get();
return existingDataMap.get(key);
}
public static void clearThreadLocalDataMap() {
if (transmittableThreadLocal != null)
transmittableThreadLocal.remove();
}
public static Object getRequestData(String key) {
Map<String, Object> existingDataMap = transmittableThreadLocal.get();
if (existingDataMap != null) {
return existingDataMap.get(key);
}
return "-1";
}
public static void initThreadLocals() {
transmittableThreadLocal.set(new ConcurrentHashMap<>());
String requestId = "REQUEST_ID_" + System.currentTimeMillis();
addDataToThreadLocalMap("REQUEST_ID", requestId);
}
}
EmployeeService.java
@Component
public abstract class EmployeeService {
public abstract void getEmployee(int employeeId);
public void fallbackMethod(int employeeid) {
threadLocalUtil.addDataToThreadLocalMap("ErrorResponse", "Fallback response:: No employee details available temporarily");
}
}
EmployeeServiceImpl.java
@Service
public class EmployeeServiceImpl extends EmployeeService {
@HystrixCommand(fallbackMethod = "fallbackMethod", commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "900"),
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10") })
public void getEmployee(int employeeId) {
System.out.println("Getting Employee details for " + employeeId + ", threadLocalUtil : " + threadLocalUtil.getDataFromThreadLocalMap("EMPLOYEE_ID"));
String response = restTemplate.exchange("http://localhost:8011/findEmployeeDetails/{employeeid}",
HttpMethod.GET, null, new ParameterizedTypeReference<String>() {
}, employeeId).getBody();
threadLocalUtil.addDataToThreadLocalMap("Response", response);
}
@Autowired
RestTemplate restTemplate;
}
HystrixHook.java
public class HystrixHook extends HystrixCommandExecutionHook {
private HystrixRequestVariableDefault<ConcurrentHashMap<String, Object>> hrv = new HystrixRequestVariableDefault<>();
@Override
public <T> void onStart(HystrixInvokable<T> commandInstance) {
HystrixRequestContext.initializeContext();
getThreadLocals();
}
@Override
public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
setThreadLocals();
}
@Override
public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
setThreadLocals();
}
@Override
public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
HystrixRequestContext.getContextForCurrentThread().shutdown();
super.onSuccess(commandInstance);
}
@Override
public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {
HystrixRequestContext.getContextForCurrentThread().shutdown();
return super.onError(commandInstance, failureType, e);
}
private void getThreadLocals() {
hrv.set(ThreadLocalUtil.getThreadLocalData());
}
private void setThreadLocals() {
ThreadLocalUtil.setThreadLocalData(hrv.get());
}
}
AbcApplication.java
public class AbcApplication {
public static void main(String[] args) {
HystrixPlugins.getInstance().registerCommandExecutionHook(new HystrixHook());
SpringApplication.run(Abc.class, args);
}
}
希望对您有所帮助
我有 InheritableThreadLocal<ConcurrentHashMap<String, Object>>
个线程,当请求通过过滤器并在其中设置一些 transaction_id 时初始化。
现在在服务层,我通过 CompletableFuture 调用了 10 个不同的 API 调用。所有 API 服务 class 都有一个 execute
方法使用 RestTempate 进行 API 调用。我把 @HystrixCommand
放在 execute
方法上。
execute 方法是 void 类型,但它将 API 响应放在 InheritableThreadLocal 对象中。
问题是当 API 调用 Hystrix 调用 FallBackMethod 失败时,当我将错误响应放入 InheritableThreadLocal 时,我无法将该错误响应发送给客户端。
ThreadLocalUtil.class
public class ThreadLocalUtil {
private static InheritableThreadLocal<ConcurrentHashMap<String, Object>> transmittableThreadLocal = new InheritableThreadLocal<>();
public static void addDataToThreadLocalMap(String key, Object value) {
Map<String, Object> existingDataMap = transmittableThreadLocal.get();
if (value != null) {
existingDataMap.put(key, value);
}
}
public static Object getDataFromThreadLocalMap(String key) {
Map<String, Object> existingDataMap = transmittableThreadLocal.get();
return existingDataMap.get(key);
}
public static void clearThreadLocalDataMap() {
if (transmittableThreadLocal != null)
transmittableThreadLocal.remove();
}
public static Object getRequestData(String key) {
Map<String, Object> existingDataMap = transmittableThreadLocal.get();
if (existingDataMap != null) {
return existingDataMap.get(key);
}
return "-1";
}
public static void initThreadLocals() {
ConcurrentHashMap<String, Object> dataForDataMap = new ConcurrentHashMap<String, Object>();
String requestId = "REQUEST_ID_" + System.currentTimeMillis();
dataForDataMap.put("REQUEST_ID", requestId);
transmittableThreadLocal.set(dataForDataMap);
}
}
CommonFilter.class
@Component
@Order(1)
public class CommonFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
try {
ThreadLocalUtil.initThreadLocals();
filterChain.doFilter(request, response);
} catch (Exception e) {
if (e instanceof ServletException) {
throw (ServletException) e;
}
} finally {
ThreadLocalUtil.clearThreadLocalDataMap();
}
}
EmployeeService.class
@Component
public abstract class EmployeeService {
@Autowired
private ThreadLocalUtil threadLocalUtil;
public abstract void getEmployee(int employeeId);
public void fallbackMethod(int employeeid) {
threadLocalUtil.addDataToThreadLocalMap("ErrorResponse", "Fallback response:: No employee details available temporarily");
}
}
EmployeeServiceImpl.class
@Service
public class EmployeeServiceImpl extends EmployeeService {
@HystrixCommand(fallbackMethod = "fallbackMethod", commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "900"),
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10") })
public void getEmployee(int employeeId) {
System.out.println("Getting Employee details for " + employeeId + ", threadLocalUtil : " + threadLocalUtil.getDataFromThreadLocalMap("EMPLOYE_ID"));
String response = restTemplate.exchange("http://localhost:8011/findEmployeeDetails/{employeeid}",
HttpMethod.GET, null, new ParameterizedTypeReference<String>() {
}, employeeId).getBody();
threadLocalUtil.addDataToThreadLocalMap("Response", response);
}
@Autowired
RestTemplate restTemplate;
@Autowired
private ThreadLocalUtil threadLocalUtil;
}
所以,首先因为内部Hystrix
使用ThreadPoolExecutor
(线程创建一次并重复使用),所以使用InheritableThreadLocal
是错误的。
从上面的问题和你在我的 blog 中提出的问题,我了解到你的问题是
InheritableThreadLocal
becomes null in hystrix fallback method
进一步添加(您可以验证)
InheritableThreadLocal
becomes null in hystrix fallback method only in case of timeouts and not in case of any other exception
我会推荐其他人参考我的blog。 Hystrix 回退 在超时的情况下,发生在 hystrix-timer 线程中。
Hystrix fallback execution thread
您可以通过记录 Thread.currentThread().getName()
由于 hystrix-timer thread
的父线程不是您的调用线程,因此您的 transmittableThreadLocal.get() 变为空。
为了解决这个问题,我建议使用 HystrixCommandExecutionHook and HystrixRequestVariableDefault。使用这个你可以实现像 onStart, onExecutionStart, onFallbackStart
等的钩子,你需要在其中 get/set threadLocal 变量。更多详情可以参考博客最后一节
更新: 对于您的用例,您可以按如下方式修改代码:
ThreadLocalUtil.java
public class ThreadLocalUtil {
private static ThreadLocal<ConcurrentHashMap<String, Object>> transmittableThreadLocal = new ThreadLocal<>();
public static ConcurrentHashMap<String, Object> getThreadLocalData() {
return transmittableThreadLocal.get();
}
public static void setThreadLocalData(ConcurrentHashMap<String, Object> data) {
transmittableThreadLocal.set(data);
}
public static void addDataToThreadLocalMap(String key, Object value) {
Map<String, Object> existingDataMap = transmittableThreadLocal.get();
if (value != null) {
existingDataMap.put(key, value);
}
}
public static Object getDataFromThreadLocalMap(String key) {
Map<String, Object> existingDataMap = transmittableThreadLocal.get();
return existingDataMap.get(key);
}
public static void clearThreadLocalDataMap() {
if (transmittableThreadLocal != null)
transmittableThreadLocal.remove();
}
public static Object getRequestData(String key) {
Map<String, Object> existingDataMap = transmittableThreadLocal.get();
if (existingDataMap != null) {
return existingDataMap.get(key);
}
return "-1";
}
public static void initThreadLocals() {
transmittableThreadLocal.set(new ConcurrentHashMap<>());
String requestId = "REQUEST_ID_" + System.currentTimeMillis();
addDataToThreadLocalMap("REQUEST_ID", requestId);
}
}
EmployeeService.java
@Component
public abstract class EmployeeService {
public abstract void getEmployee(int employeeId);
public void fallbackMethod(int employeeid) {
threadLocalUtil.addDataToThreadLocalMap("ErrorResponse", "Fallback response:: No employee details available temporarily");
}
}
EmployeeServiceImpl.java
@Service
public class EmployeeServiceImpl extends EmployeeService {
@HystrixCommand(fallbackMethod = "fallbackMethod", commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "900"),
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10") })
public void getEmployee(int employeeId) {
System.out.println("Getting Employee details for " + employeeId + ", threadLocalUtil : " + threadLocalUtil.getDataFromThreadLocalMap("EMPLOYEE_ID"));
String response = restTemplate.exchange("http://localhost:8011/findEmployeeDetails/{employeeid}",
HttpMethod.GET, null, new ParameterizedTypeReference<String>() {
}, employeeId).getBody();
threadLocalUtil.addDataToThreadLocalMap("Response", response);
}
@Autowired
RestTemplate restTemplate;
}
HystrixHook.java
public class HystrixHook extends HystrixCommandExecutionHook {
private HystrixRequestVariableDefault<ConcurrentHashMap<String, Object>> hrv = new HystrixRequestVariableDefault<>();
@Override
public <T> void onStart(HystrixInvokable<T> commandInstance) {
HystrixRequestContext.initializeContext();
getThreadLocals();
}
@Override
public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
setThreadLocals();
}
@Override
public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
setThreadLocals();
}
@Override
public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
HystrixRequestContext.getContextForCurrentThread().shutdown();
super.onSuccess(commandInstance);
}
@Override
public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {
HystrixRequestContext.getContextForCurrentThread().shutdown();
return super.onError(commandInstance, failureType, e);
}
private void getThreadLocals() {
hrv.set(ThreadLocalUtil.getThreadLocalData());
}
private void setThreadLocals() {
ThreadLocalUtil.setThreadLocalData(hrv.get());
}
}
AbcApplication.java
public class AbcApplication {
public static void main(String[] args) {
HystrixPlugins.getInstance().registerCommandExecutionHook(new HystrixHook());
SpringApplication.run(Abc.class, args);
}
}
希望对您有所帮助