无法在回退方法中从父线程访问 InheritableThreadLocal 对象

Not able to access InheritableThreadLocal object from parent thread in fall back method

我有 InheritableThreadLocal<ConcurrentHashMap<String, Object>> 个线程,当请求通过过滤器并在其中设置一些 transaction_id 时初始化。

现在在服务层,我通过 CompletableFuture 调用了 10 个不同的 API 调用。所有 API 服务 class 都有一个 execute 方法使用 RestTempate 进行 API 调用。我把 @HystrixCommand 放在 execute 方法上。

execute 方法是 void 类型,但它将 API 响应放在 InheritableThreadLocal 对象中。

问题是当 API 调用 Hystrix 调用 FallBackMethod 失败时,当我将错误响应放入 InheritableThreadLocal 时,我无法将该错误响应发送给客户端。

ThreadLocalUtil.class

public class ThreadLocalUtil {

    private static InheritableThreadLocal<ConcurrentHashMap<String, Object>> transmittableThreadLocal = new InheritableThreadLocal<>();

    public static void addDataToThreadLocalMap(String key, Object value) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        if (value != null) {
            existingDataMap.put(key, value);
        }
    }

    public static Object getDataFromThreadLocalMap(String key) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        return existingDataMap.get(key);
    }

    public static void clearThreadLocalDataMap() {
        if (transmittableThreadLocal != null) 
            transmittableThreadLocal.remove();
    }

    public static Object getRequestData(String key) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        if (existingDataMap != null) {
            return existingDataMap.get(key);
        }
        return "-1";
    }

    public static void initThreadLocals() {
        ConcurrentHashMap<String, Object> dataForDataMap = new ConcurrentHashMap<String, Object>();
        String requestId = "REQUEST_ID_" + System.currentTimeMillis();
        dataForDataMap.put("REQUEST_ID", requestId);
        transmittableThreadLocal.set(dataForDataMap);
    }
}

CommonFilter.class

@Component
@Order(1)
public class CommonFilter extends OncePerRequestFilter {

  @Override
  protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
          throws ServletException, IOException {
      try {
          ThreadLocalUtil.initThreadLocals();
          filterChain.doFilter(request, response);
      } catch (Exception e) {
          if (e instanceof ServletException) {
              throw (ServletException) e;
          }
      } finally {
          ThreadLocalUtil.clearThreadLocalDataMap();
      }

  }

EmployeeService.class

@Component
public abstract class EmployeeService {

    @Autowired
    private ThreadLocalUtil threadLocalUtil;

    public abstract void getEmployee(int employeeId);

    public void fallbackMethod(int employeeid) {
        threadLocalUtil.addDataToThreadLocalMap("ErrorResponse", "Fallback response:: No employee details available temporarily");
    }
}

EmployeeServiceImpl.class

@Service
public class EmployeeServiceImpl extends EmployeeService {

    @HystrixCommand(fallbackMethod = "fallbackMethod", commandProperties = {
            @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "900"),
            @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10") })
    public void getEmployee(int employeeId) {
        System.out.println("Getting Employee details for " + employeeId + ", threadLocalUtil : " + threadLocalUtil.getDataFromThreadLocalMap("EMPLOYE_ID"));
        String response = restTemplate.exchange("http://localhost:8011/findEmployeeDetails/{employeeid}",
                HttpMethod.GET, null, new ParameterizedTypeReference<String>() {
                }, employeeId).getBody();

        threadLocalUtil.addDataToThreadLocalMap("Response", response);
    }

    @Autowired
    RestTemplate restTemplate;

    @Autowired
    private ThreadLocalUtil threadLocalUtil;
}

所以,首先因为内部Hystrix使用ThreadPoolExecutor(线程创建一次并重复使用),所以使用InheritableThreadLocal是错误的。

从上面的问题和你在我的 blog 中提出的问题,我了解到你的问题是

InheritableThreadLocal becomes null in hystrix fallback method

进一步添加(您可以验证)

InheritableThreadLocal becomes null in hystrix fallback method only in case of timeouts and not in case of any other exception

我会推荐其他人参考我的blog。 Hystrix 回退 在超时的情况下,发生在 hystrix-timer 线程中。 Hystrix fallback execution thread 您可以通过记录 Thread.currentThread().getName()

来验证这一点

由于 hystrix-timer thread 的父线程不是您的调用线程,因此您的 transmittableThreadLocal.get() 变为空。

为了解决这个问题,我建议使用 HystrixCommandExecutionHook and HystrixRequestVariableDefault。使用这个你可以实现像 onStart, onExecutionStart, onFallbackStart 等的钩子,你需要在其中 get/set threadLocal 变量。更多详情可以参考博客最后一节

更新: 对于您的用例,您可以按如下方式修改代码:

ThreadLocalUtil.java

public class ThreadLocalUtil {

    private static ThreadLocal<ConcurrentHashMap<String, Object>> transmittableThreadLocal = new ThreadLocal<>();

    public static ConcurrentHashMap<String, Object> getThreadLocalData() {
        return transmittableThreadLocal.get();
    }

    public static void setThreadLocalData(ConcurrentHashMap<String, Object> data) {
        transmittableThreadLocal.set(data);
    }

    public static void addDataToThreadLocalMap(String key, Object value) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        if (value != null) {
            existingDataMap.put(key, value);
        }
    }

    public static Object getDataFromThreadLocalMap(String key) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        return existingDataMap.get(key);
    }

    public static void clearThreadLocalDataMap() {
        if (transmittableThreadLocal != null) 
            transmittableThreadLocal.remove();
    }

    public static Object getRequestData(String key) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        if (existingDataMap != null) {
            return existingDataMap.get(key);
        }
        return "-1";
    }



    public static void initThreadLocals() {
        transmittableThreadLocal.set(new ConcurrentHashMap<>());
        String requestId = "REQUEST_ID_" + System.currentTimeMillis();
        addDataToThreadLocalMap("REQUEST_ID", requestId);
    }
}

EmployeeService.java

@Component
public abstract class EmployeeService {
    public abstract void getEmployee(int employeeId);

    public void fallbackMethod(int employeeid) {
        threadLocalUtil.addDataToThreadLocalMap("ErrorResponse", "Fallback response:: No employee details available temporarily");
    }
}

EmployeeServiceImpl.java

@Service
public class EmployeeServiceImpl extends EmployeeService {

    @HystrixCommand(fallbackMethod = "fallbackMethod", commandProperties = {
            @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "900"),
            @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10") })
    public void getEmployee(int employeeId) {
        System.out.println("Getting Employee details for " + employeeId + ", threadLocalUtil : " + threadLocalUtil.getDataFromThreadLocalMap("EMPLOYEE_ID"));
        String response = restTemplate.exchange("http://localhost:8011/findEmployeeDetails/{employeeid}",
                HttpMethod.GET, null, new ParameterizedTypeReference<String>() {
                }, employeeId).getBody();

        threadLocalUtil.addDataToThreadLocalMap("Response", response);
    }

    @Autowired
    RestTemplate restTemplate;
}

HystrixHook.java

public class HystrixHook extends HystrixCommandExecutionHook {

    private HystrixRequestVariableDefault<ConcurrentHashMap<String, Object>> hrv = new HystrixRequestVariableDefault<>();

    @Override
    public <T> void onStart(HystrixInvokable<T> commandInstance) {
        HystrixRequestContext.initializeContext();
        getThreadLocals();
    }

    @Override
    public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
        setThreadLocals();
    }


    @Override
    public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
        setThreadLocals();
    }


    @Override
    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
        HystrixRequestContext.getContextForCurrentThread().shutdown();
        super.onSuccess(commandInstance);
    }

    @Override
    public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {
        HystrixRequestContext.getContextForCurrentThread().shutdown();
        return super.onError(commandInstance, failureType, e);
    }

    private void getThreadLocals() {
        hrv.set(ThreadLocalUtil.getThreadLocalData());
    }

    private void setThreadLocals() {
        ThreadLocalUtil.setThreadLocalData(hrv.get());
    }
}

AbcApplication.java

public class AbcApplication {
    public static void main(String[] args) {
        HystrixPlugins.getInstance().registerCommandExecutionHook(new HystrixHook());
        SpringApplication.run(Abc.class, args);
    }
}

希望对您有所帮助