Spring @Async 传播上下文信息
Spring @Async propagate context information
我有一个 Spring Boot 2.2 应用程序。我创建了这样的服务:
@Async
@PreAuthorize("hasAnyRole('ROLE_PBX')")
@PlanAuthorization(allowedPlans = {PlanType.BUSINESS, PlanType.ENTERPRISE})
public Future<AuditCdr> saveCDR(Cdr3CXDto cdrRecord) {
log.debug("Current tenant {}", TenantContext.getCurrentTenantId());
return new AsyncResult<AuditCdr>(auditCdrRepository.save(cdr3CXMapper.cdr3CXDtoToAuditCdr(cdrRecord)));
}
这是我的@Async 配置:
@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("threadAsync");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
使用 SecurityContextHolder.MODE_INHERITABLETHREADLOCAL
我看到安全上下文已传递给 @Async 方法。
在我的多租户应用程序中,我使用 ThreadLocal 来设置租户的 ID:
public class TenantContext {
public final static String TENANT_DEFAULT = "empty";
private static final ThreadLocal<String> code = new ThreadLocal<>();
public static void setCurrentTenantId(String code) {
if (code != null)
TenantContext.code.set(code);
}
public static String getCurrentTenantId() {
String tenantId = code.get();
if (StringUtils.isNotBlank(tenantId)) {
return tenantId;
}
return TENANT_DEFAULT;
}
public static void clear() {
code.remove();
}
}
因为ThreadLocal是跟线程相关的,所以在@Async方法中是没有的。此外,我的自定义 @PlanAuthorization
aop 需要它来执行租户计划的验证。
在我的应用程序的任何 @Async 方法中是否有一种干净的方法来设置 TenantContext?
您必须使用 InheritableThreadLocal 而不是 ThreadLocal。然后你会看到来自父线程的值。
API 文档:https://docs.oracle.com/javase/8/docs/api/java/lang/InheritableThreadLocal.html
这里有一篇结合Spring的文章:https://medium.com/@hariohmprasath/async-process-using-spring-and-injecting-user-context-6f1af16e9759
我最终使用了 TaskDecorator:
@Log4j2
public class MdcTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// Right now: Web thread context !
// (Grab the current thread MDC data)
String tenantId = TenantContext.getCurrentTenantId();
Long storeId = StoreContext.getCurrentStoreId();
SecurityContext securityContext = SecurityContextHolder.getContext();
Map<String, String> contextMap = MDC.getCopyOfContextMap();
log.info("Saving tenant information for async thread...");
return () -> {
try {
// Right now: @Async thread context !
// (Restore the Web thread context's MDC data)
TenantContext.setCurrentTenantId(tenantId);
StoreContext.setCurrentStoreId(storeId);
SecurityContextHolder.setContext(securityContext);
MDC.setContextMap(contextMap);
log.info("Restoring tenant information for async thread...");
runnable.run();
} catch (Throwable e) {
log.error("Error in async task", e);
} finally {
MDC.clear();
}
};
}
}
我是这样使用的:
@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("threadAsync");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setTaskDecorator(new MdcTaskDecorator());
executor.initialize();
return executor;
}
}
它有效,而且似乎也是一个很好的解决方案。
这种情况的解决方法是:
配置自定义线程池,以便您覆盖它的执行方法来设置本地线程(或
从您的主要上下文执行任何任务),装饰任务并提交装饰任务以执行而不是原始任务
指示@Async 注解使用具体的线程池
@Bean("tenantExecutor")
public Executor threadLocalAwareThreadPool() {
final CustomizableThreadFactory threadNameAwareFactory =
new CustomizableThreadFactory("threadAsync");
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(500), threadNameAwareFactory) {
// override original method of thread pool
@Override
public void execute(Runnable originalTask) {
final String tenantId = tenantThreadLocal.get(); // read data from current before passing the task to async thread
// decorate the actual task by creating new task (Runnable) where you first set up the thread local and then execute your actual task
super.execute(() -> {
tenantThreadLocal.set(tenantId); // set data in actual async thread
originalTask.run();
});
}
};
return threadPoolExecutor;
}
现在我们告诉 spring 使用我们的自定义执行器
@Async("tenantExecutor")
public Future<AuditCdr> saveCDR(Cdr3CXDto cdrRecord) {
// your code....
}
我有一个 Spring Boot 2.2 应用程序。我创建了这样的服务:
@Async
@PreAuthorize("hasAnyRole('ROLE_PBX')")
@PlanAuthorization(allowedPlans = {PlanType.BUSINESS, PlanType.ENTERPRISE})
public Future<AuditCdr> saveCDR(Cdr3CXDto cdrRecord) {
log.debug("Current tenant {}", TenantContext.getCurrentTenantId());
return new AsyncResult<AuditCdr>(auditCdrRepository.save(cdr3CXMapper.cdr3CXDtoToAuditCdr(cdrRecord)));
}
这是我的@Async 配置:
@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("threadAsync");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
使用 SecurityContextHolder.MODE_INHERITABLETHREADLOCAL
我看到安全上下文已传递给 @Async 方法。
在我的多租户应用程序中,我使用 ThreadLocal 来设置租户的 ID:
public class TenantContext {
public final static String TENANT_DEFAULT = "empty";
private static final ThreadLocal<String> code = new ThreadLocal<>();
public static void setCurrentTenantId(String code) {
if (code != null)
TenantContext.code.set(code);
}
public static String getCurrentTenantId() {
String tenantId = code.get();
if (StringUtils.isNotBlank(tenantId)) {
return tenantId;
}
return TENANT_DEFAULT;
}
public static void clear() {
code.remove();
}
}
因为ThreadLocal是跟线程相关的,所以在@Async方法中是没有的。此外,我的自定义 @PlanAuthorization
aop 需要它来执行租户计划的验证。
在我的应用程序的任何 @Async 方法中是否有一种干净的方法来设置 TenantContext?
您必须使用 InheritableThreadLocal 而不是 ThreadLocal。然后你会看到来自父线程的值。
API 文档:https://docs.oracle.com/javase/8/docs/api/java/lang/InheritableThreadLocal.html
这里有一篇结合Spring的文章:https://medium.com/@hariohmprasath/async-process-using-spring-and-injecting-user-context-6f1af16e9759
我最终使用了 TaskDecorator:
@Log4j2
public class MdcTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// Right now: Web thread context !
// (Grab the current thread MDC data)
String tenantId = TenantContext.getCurrentTenantId();
Long storeId = StoreContext.getCurrentStoreId();
SecurityContext securityContext = SecurityContextHolder.getContext();
Map<String, String> contextMap = MDC.getCopyOfContextMap();
log.info("Saving tenant information for async thread...");
return () -> {
try {
// Right now: @Async thread context !
// (Restore the Web thread context's MDC data)
TenantContext.setCurrentTenantId(tenantId);
StoreContext.setCurrentStoreId(storeId);
SecurityContextHolder.setContext(securityContext);
MDC.setContextMap(contextMap);
log.info("Restoring tenant information for async thread...");
runnable.run();
} catch (Throwable e) {
log.error("Error in async task", e);
} finally {
MDC.clear();
}
};
}
}
我是这样使用的:
@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("threadAsync");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setTaskDecorator(new MdcTaskDecorator());
executor.initialize();
return executor;
}
}
它有效,而且似乎也是一个很好的解决方案。
这种情况的解决方法是:
配置自定义线程池,以便您覆盖它的执行方法来设置本地线程(或 从您的主要上下文执行任何任务),装饰任务并提交装饰任务以执行而不是原始任务
指示@Async 注解使用具体的线程池
@Bean("tenantExecutor") public Executor threadLocalAwareThreadPool() { final CustomizableThreadFactory threadNameAwareFactory = new CustomizableThreadFactory("threadAsync"); final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(500), threadNameAwareFactory) { // override original method of thread pool @Override public void execute(Runnable originalTask) { final String tenantId = tenantThreadLocal.get(); // read data from current before passing the task to async thread // decorate the actual task by creating new task (Runnable) where you first set up the thread local and then execute your actual task super.execute(() -> { tenantThreadLocal.set(tenantId); // set data in actual async thread originalTask.run(); }); } }; return threadPoolExecutor; }
现在我们告诉 spring 使用我们的自定义执行器
@Async("tenantExecutor")
public Future<AuditCdr> saveCDR(Cdr3CXDto cdrRecord) {
// your code....
}