java 8 个使用 ForkJoinPool 和 ThreadLocal 的并行流
java 8 parallel stream with ForkJoinPool and ThreadLocal
我们正在使用java8个并行流来处理任务,我们正在通过ForkJoinPool#submit提交任务。我们没有使用 jvm wide ForkJoinPool.commonPool,而是创建了我们自己的自定义池来指定并行度并将其存储为静态变量。
我们有验证框架,我们将表列表置于验证器列表中,我们通过自定义 ForkJoinPool 提交此作业,如下所示:
static ForkJoinPool forkJoinPool = new ForkJoinPool(4);
List<Table> tables = tableDAO.findAll();
ModelValidator<Table, ValidationResult> validator = ValidatorFactory
.getInstance().getTableValidator();
List<ValidationResult> result = forkJoinPool.submit(
() -> tables.stream()
.parallel()
.map(validator)
.filter(result -> result.getValidationMessages().size() > 0)
.collect(Collectors.toList())).get();
我们遇到的问题是,在下游组件中,运行 在与静态 ForkJoinPool 不同的线程上的各个验证器依赖于 tenant_id,每个请求都不同,并存储在 InheritableThreadLocal 变量中。由于我们正在创建一个静态的 ForkJoinPool,因此 ForkJoinPool 池中的线程只会在第一次创建时继承父线程的值。但是这些池线程不会知道当前请求的新 tenant_id。因此,对于后续执行,这些池线程使用旧的 tenant_id.
我尝试创建自定义 ForkJoinPool 并在构造函数中指定 ForkJoinWorkerThreadFactory 并重写 onStart 方法以提供新的 tenant_id。但这不起作用,因为 onStart 方法仅在创建时调用一次,而不是在单独执行时调用。
似乎我们需要像 ThreadPoolExecutor#beforeExecute 这样的东西,它在 ForkJoinPool 的情况下不可用。那么如果我们想将当前线程本地值传递给静态池线程,我们有什么选择呢?
一种解决方法是为每个请求创建 ForkJoinPool,而不是使其成为静态的,但我们不想这样做,以避免线程创建的昂贵性质。
我们有什么选择?
我认为最好的选择是摆脱本地线程并将其作为参数传递。我知道这可能是一项艰巨的任务。另一种选择是使用包装器。
假设你的验证器有一个验证方法,你可以这样做:
public class WrappingModelValidator implements ModelValidator<Table. ValidationResult> {
private final ModelValidator<Table. ValidationResult> v;
private final String tenantId;
public WrappingModelValidator(ModelValidator<Table. ValidationResult> v, String tenantId) {
this.v = v;
this.tenantId = tenantId;
}
public ValidationResult validate(Table t) {
String oldValue = YourThreadLocal.get();
YourThreadLocal.set(tenantId);
try {
return v.validate(t);
} finally {
YourThreadLocal.set(oldValue);
}
}
}
然后你只需包装你的旧验证器,它将在进入时将线程设置为本地并在完成后将其恢复。
我发现以下解决方案无需更改任何基础代码即可运行。基本上,map 方法采用一个功能接口,我将其表示为 lambda 表达式。此表达式添加一个 preExecution 挂钩以在当前 threadlocal 中设置新的 tenantId,并在 postExecution 中清除它。
forkJoinPool.submit(tables.stream()
.parallel()
.map((item) -> {
preExecution(tenantId)
try {
return validator.apply(item);
} finally {
postExecution();
}
}
)
.filter(validationResult ->
validationResult.getValidationMessages()
.size() > 0)
.collect(Collectors.toList())).get();
我们正在使用java8个并行流来处理任务,我们正在通过ForkJoinPool#submit提交任务。我们没有使用 jvm wide ForkJoinPool.commonPool,而是创建了我们自己的自定义池来指定并行度并将其存储为静态变量。
我们有验证框架,我们将表列表置于验证器列表中,我们通过自定义 ForkJoinPool 提交此作业,如下所示:
static ForkJoinPool forkJoinPool = new ForkJoinPool(4);
List<Table> tables = tableDAO.findAll();
ModelValidator<Table, ValidationResult> validator = ValidatorFactory
.getInstance().getTableValidator();
List<ValidationResult> result = forkJoinPool.submit(
() -> tables.stream()
.parallel()
.map(validator)
.filter(result -> result.getValidationMessages().size() > 0)
.collect(Collectors.toList())).get();
我们遇到的问题是,在下游组件中,运行 在与静态 ForkJoinPool 不同的线程上的各个验证器依赖于 tenant_id,每个请求都不同,并存储在 InheritableThreadLocal 变量中。由于我们正在创建一个静态的 ForkJoinPool,因此 ForkJoinPool 池中的线程只会在第一次创建时继承父线程的值。但是这些池线程不会知道当前请求的新 tenant_id。因此,对于后续执行,这些池线程使用旧的 tenant_id.
我尝试创建自定义 ForkJoinPool 并在构造函数中指定 ForkJoinWorkerThreadFactory 并重写 onStart 方法以提供新的 tenant_id。但这不起作用,因为 onStart 方法仅在创建时调用一次,而不是在单独执行时调用。
似乎我们需要像 ThreadPoolExecutor#beforeExecute 这样的东西,它在 ForkJoinPool 的情况下不可用。那么如果我们想将当前线程本地值传递给静态池线程,我们有什么选择呢?
一种解决方法是为每个请求创建 ForkJoinPool,而不是使其成为静态的,但我们不想这样做,以避免线程创建的昂贵性质。
我们有什么选择?
我认为最好的选择是摆脱本地线程并将其作为参数传递。我知道这可能是一项艰巨的任务。另一种选择是使用包装器。
假设你的验证器有一个验证方法,你可以这样做:
public class WrappingModelValidator implements ModelValidator<Table. ValidationResult> {
private final ModelValidator<Table. ValidationResult> v;
private final String tenantId;
public WrappingModelValidator(ModelValidator<Table. ValidationResult> v, String tenantId) {
this.v = v;
this.tenantId = tenantId;
}
public ValidationResult validate(Table t) {
String oldValue = YourThreadLocal.get();
YourThreadLocal.set(tenantId);
try {
return v.validate(t);
} finally {
YourThreadLocal.set(oldValue);
}
}
}
然后你只需包装你的旧验证器,它将在进入时将线程设置为本地并在完成后将其恢复。
我发现以下解决方案无需更改任何基础代码即可运行。基本上,map 方法采用一个功能接口,我将其表示为 lambda 表达式。此表达式添加一个 preExecution 挂钩以在当前 threadlocal 中设置新的 tenantId,并在 postExecution 中清除它。
forkJoinPool.submit(tables.stream()
.parallel()
.map((item) -> {
preExecution(tenantId)
try {
return validator.apply(item);
} finally {
postExecution();
}
}
)
.filter(validationResult ->
validationResult.getValidationMessages()
.size() > 0)
.collect(Collectors.toList())).get();