Spring @Async 传播上下文信息

Spring @Async propagate context information

我有一个 Spring Boot 2.2 应用程序。我创建了这样的服务:

@Async
@PreAuthorize("hasAnyRole('ROLE_PBX')")
@PlanAuthorization(allowedPlans = {PlanType.BUSINESS, PlanType.ENTERPRISE})
public Future<AuditCdr> saveCDR(Cdr3CXDto cdrRecord) {
    log.debug("Current tenant {}", TenantContext.getCurrentTenantId());  

    return new AsyncResult<AuditCdr>(auditCdrRepository.save(cdr3CXMapper.cdr3CXDtoToAuditCdr(cdrRecord)));
}

这是我的@Async 配置:

@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("threadAsync");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }
}

使用 SecurityContextHolder.MODE_INHERITABLETHREADLOCAL 我看到安全上下文已传递给 @Async 方法。 在我的多租户应用程序中,我使用 ThreadLocal 来设置租户的 ID:

public class TenantContext {
    public final static String TENANT_DEFAULT = "empty";
    private static final ThreadLocal<String> code = new ThreadLocal<>();

    public static void setCurrentTenantId(String code) {
        if (code != null)
            TenantContext.code.set(code);
    }

    public static String getCurrentTenantId() {
        String tenantId = code.get();
        if (StringUtils.isNotBlank(tenantId)) {
            return tenantId;
        }
        return TENANT_DEFAULT;
    }

    public static void clear() {
        code.remove();
    }

}

因为ThreadLocal是跟线程相关的,所以在@Async方法中是没有的。此外,我的自定义 @PlanAuthorization aop 需要它来执行租户计划的验证。 在我的应用程序的任何 @Async 方法中是否有一种干净的方法来设置 TenantContext?

您必须使用 InheritableThreadLocal 而不是 ThreadLocal。然后你会看到来自父线程的值。

API 文档:https://docs.oracle.com/javase/8/docs/api/java/lang/InheritableThreadLocal.html

这里有一篇结合Spring的文章:https://medium.com/@hariohmprasath/async-process-using-spring-and-injecting-user-context-6f1af16e9759

我最终使用了 TaskDecorator:

@Log4j2
public class MdcTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable runnable) {
        // Right now: Web thread context !
        // (Grab the current thread MDC data)
        String tenantId = TenantContext.getCurrentTenantId();
        Long storeId = StoreContext.getCurrentStoreId();
        SecurityContext securityContext = SecurityContextHolder.getContext();
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        log.info("Saving tenant information for async thread...");
        return () -> {
            try {
                // Right now: @Async thread context !
                // (Restore the Web thread context's MDC data)
                TenantContext.setCurrentTenantId(tenantId);
                StoreContext.setCurrentStoreId(storeId);
                SecurityContextHolder.setContext(securityContext);
                MDC.setContextMap(contextMap);
                log.info("Restoring tenant information for async thread...");
                runnable.run();
            } catch (Throwable e) {
                log.error("Error in async task", e);
            } finally {
                MDC.clear();
            }
        };
    }
}

我是这样使用的:

@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("threadAsync");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setTaskDecorator(new MdcTaskDecorator());
        executor.initialize();
        return executor;
    }
}

它有效,而且似乎也是一个很好的解决方案。

这种情况的解决方法是:

  1. 配置自定义线程池,以便您覆盖它的执行方法来设置本地线程(或 从您的主要上下文执行任何任务),装饰任务并提交装饰任务以执行而不是原始任务

  2. 指示@Async 注解使用具体的线程池

      @Bean("tenantExecutor")
      public Executor threadLocalAwareThreadPool() {
    
       final CustomizableThreadFactory threadNameAwareFactory =
          new CustomizableThreadFactory("threadAsync");
    
        final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10,
          0L, TimeUnit.MILLISECONDS,
          new ArrayBlockingQueue<>(500), threadNameAwareFactory) {
    
          // override original method of thread pool
          @Override
          public void execute(Runnable originalTask) {
            final String tenantId = tenantThreadLocal.get(); // read data from current before passing the task to async thread 
    
    // decorate the actual task by creating new task (Runnable) where you first set up the thread local and then execute your actual task 
            super.execute(() -> {
              tenantThreadLocal.set(tenantId); // set data in actual async thread
              originalTask.run();
            });
          }
        };
    
    
        return threadPoolExecutor;
    
      }
    

现在我们告诉 spring 使用我们的自定义执行器

    @Async("tenantExecutor") 
    public Future<AuditCdr> saveCDR(Cdr3CXDto cdrRecord) {
        // your code....
    }