如何避免 forkjoin 延续引起的线程局部损坏
how to avoid threadlocal corruption from forkjoin continuation
This question is NOT about how to use a ThreadLocal. My question is
about the side effect of the ForkJoinPool continuation of ForkJoinTask.compute() which breaks the ThreadLocal contract.
在一个ForkJoinTask.compute()
中,我拉取了一个任意的静态ThreadLocal。
该值是某个任意有状态对象,但在 compute()
调用结束后没有状态。换句话说,我准备了threadlocal object/state,使用它,然后处理。
原则上你会把那个状态放在 ForkJoinTasK 中,但假设这个线程本地值在我无法更改的第 3 方库中。因此静态线程本地,因为它是所有任务实例将共享的资源。
当然,我预计、测试并证明了简单的 ThreadLocal 只会初始化一次。这意味着由于 ForkJoinTask.join()
调用下的线程继续,我的 compute()
方法甚至可以在退出之前再次调用。 这暴露了上一个计算调用中使用的对象的状态,堆栈帧更高。
您如何解决不良曝光问题?
我目前看到的唯一方法是确保每次 compute()
调用都有新线程,但这会破坏 F/J 池延续并可能危险地增加线程数。
在 JRE 核心中没有什么可做的来备份自第一个 ForkJoinTask 以来更改的 TL 并恢复整个线程本地映射,就好像每个 task.compute 都是第一个到 运行线程?
谢谢。
package jdk8tests;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;
public class TestForkJoin3 {
static AtomicInteger nextId = new AtomicInteger();
static long T0 = System.currentTimeMillis();
static int NTHREADS = 5;
static final ThreadLocal<StringBuilder> myTL = ThreadLocal.withInitial( () -> new StringBuilder());
static void log(Object msg) {
System.out.format("%09.3f %-10s %s%n", new Double(0.001*(System.currentTimeMillis()-T0)), Thread.currentThread().getName(), " : "+msg);
}
public static void main(String[] args) throws Exception {
ForkJoinPool p = new ForkJoinPool(
NTHREADS,
pool -> {
int id = nextId.incrementAndGet(); //count new threads
log("new FJ thread "+ id);
ForkJoinWorkerThread t = new ForkJoinWorkerThread(pool) {/**/};
t.setName("My FJThread "+id);
return t;
},
Thread.getDefaultUncaughtExceptionHandler(),
false
);
LowercasingTask t = new LowercasingTask("ROOT", 3);
p.invoke(t);
int nt = nextId.get();
log("number of threads was "+nt);
if(nt > NTHREADS)
log(">>>>>>> more threads than prescribed <<<<<<<<");
}
//=====================
static class LowercasingTask extends RecursiveTask<String> {
String name;
int level;
public LowercasingTask(String name, int level) {
this.name = name;
this.level = level;
}
@Override
protected String compute() {
StringBuilder sbtl = myTL.get();
String initialValue = sbtl.toString();
if(!initialValue.equals(""))
log("!!!!!! BROKEN ON START!!!!!!! value = "+ initialValue);
sbtl.append(":START");
if(level>0) {
log(name+": compute level "+level);
try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}
List<LowercasingTask> tasks = new ArrayList<>();
for(int i=1; i<=9; i++) {
LowercasingTask lt = new LowercasingTask(name+"."+i, level-1);
tasks.add(lt);
lt.fork();
}
for(int i=0; i<tasks.size(); i++) { //this can lead to compensation threads due to l1.join() method running lifo task lN
//for(int i=tasks.size()-1; i>=0; i--) { //this usually has the lN.join() method running task lN, without compensation threads.
tasks.get(i).join();
}
log(name+": returning from joins");
}
sbtl.append(":END");
String val = sbtl.toString();
if(!val.equals(":START:END"))
log("!!!!!! BROKEN AT END !!!!!!! value = "+val);
sbtl.setLength(0);
return "done";
}
}
}
我不这么认为。一般情况下不会,特别是 ForkJoinTask
任务预计是孤立对象上的纯函数。
有时可以将任务的顺序更改为在开始和自己的任务工作之前分叉和加入。这样子任务将在返回之前初始化并处理线程本地。如果那不可能,也许您可以将线程本地视为一个堆栈,并推送、清除和恢复每个连接周围的值。
This question is NOT about how to use a ThreadLocal. My question is about the side effect of the ForkJoinPool continuation of ForkJoinTask.compute() which breaks the ThreadLocal contract.
在一个ForkJoinTask.compute()
中,我拉取了一个任意的静态ThreadLocal。
该值是某个任意有状态对象,但在 compute()
调用结束后没有状态。换句话说,我准备了threadlocal object/state,使用它,然后处理。
原则上你会把那个状态放在 ForkJoinTasK 中,但假设这个线程本地值在我无法更改的第 3 方库中。因此静态线程本地,因为它是所有任务实例将共享的资源。
当然,我预计、测试并证明了简单的 ThreadLocal 只会初始化一次。这意味着由于 ForkJoinTask.join()
调用下的线程继续,我的 compute()
方法甚至可以在退出之前再次调用。 这暴露了上一个计算调用中使用的对象的状态,堆栈帧更高。
您如何解决不良曝光问题?
我目前看到的唯一方法是确保每次 compute()
调用都有新线程,但这会破坏 F/J 池延续并可能危险地增加线程数。
在 JRE 核心中没有什么可做的来备份自第一个 ForkJoinTask 以来更改的 TL 并恢复整个线程本地映射,就好像每个 task.compute 都是第一个到 运行线程?
谢谢。
package jdk8tests;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;
public class TestForkJoin3 {
static AtomicInteger nextId = new AtomicInteger();
static long T0 = System.currentTimeMillis();
static int NTHREADS = 5;
static final ThreadLocal<StringBuilder> myTL = ThreadLocal.withInitial( () -> new StringBuilder());
static void log(Object msg) {
System.out.format("%09.3f %-10s %s%n", new Double(0.001*(System.currentTimeMillis()-T0)), Thread.currentThread().getName(), " : "+msg);
}
public static void main(String[] args) throws Exception {
ForkJoinPool p = new ForkJoinPool(
NTHREADS,
pool -> {
int id = nextId.incrementAndGet(); //count new threads
log("new FJ thread "+ id);
ForkJoinWorkerThread t = new ForkJoinWorkerThread(pool) {/**/};
t.setName("My FJThread "+id);
return t;
},
Thread.getDefaultUncaughtExceptionHandler(),
false
);
LowercasingTask t = new LowercasingTask("ROOT", 3);
p.invoke(t);
int nt = nextId.get();
log("number of threads was "+nt);
if(nt > NTHREADS)
log(">>>>>>> more threads than prescribed <<<<<<<<");
}
//=====================
static class LowercasingTask extends RecursiveTask<String> {
String name;
int level;
public LowercasingTask(String name, int level) {
this.name = name;
this.level = level;
}
@Override
protected String compute() {
StringBuilder sbtl = myTL.get();
String initialValue = sbtl.toString();
if(!initialValue.equals(""))
log("!!!!!! BROKEN ON START!!!!!!! value = "+ initialValue);
sbtl.append(":START");
if(level>0) {
log(name+": compute level "+level);
try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}
List<LowercasingTask> tasks = new ArrayList<>();
for(int i=1; i<=9; i++) {
LowercasingTask lt = new LowercasingTask(name+"."+i, level-1);
tasks.add(lt);
lt.fork();
}
for(int i=0; i<tasks.size(); i++) { //this can lead to compensation threads due to l1.join() method running lifo task lN
//for(int i=tasks.size()-1; i>=0; i--) { //this usually has the lN.join() method running task lN, without compensation threads.
tasks.get(i).join();
}
log(name+": returning from joins");
}
sbtl.append(":END");
String val = sbtl.toString();
if(!val.equals(":START:END"))
log("!!!!!! BROKEN AT END !!!!!!! value = "+val);
sbtl.setLength(0);
return "done";
}
}
}
我不这么认为。一般情况下不会,特别是 ForkJoinTask
任务预计是孤立对象上的纯函数。
有时可以将任务的顺序更改为在开始和自己的任务工作之前分叉和加入。这样子任务将在返回之前初始化并处理线程本地。如果那不可能,也许您可以将线程本地视为一个堆栈,并推送、清除和恢复每个连接周围的值。