能否使用 fork/join 跨线程边界安全地移植非线程安全值?
Can a non-thread-safe value be safely ported across thread boundaries using fork/join?
我有一些 class 不是线程安全的:
class ThreadUnsafeClass {
long i;
long incrementAndGet() { return ++i; }
}
(我在这里使用 long
作为字段,但我们应该将其字段视为某种线程不安全类型)。
我现在有一个 class 看起来像这样
class Foo {
final ThreadUnsafeClass c;
Foo(ThreadUnsafeClass c) {
this.c = c;
}
}
即线程不安全class是它的一个final字段。现在我要这样做:
public class JavaMM {
public static void main(String[] args) {
final ForkJoinTask<ThreadUnsafeClass> work = ForkJoinTask.adapt(() -> {
ThreadUnsafeClass t = new ThreadUnsafeClass();
t.incrementAndGet();
return new FC(t);
});
assert (work.fork().join().c.i == 1);
}
}
也就是说,从线程 T
(主),我在 T'
(fork-join-pool)上调用了一些工作,它创建并改变了我的不安全实例 class 然后 returns 结果包裹在 Foo
中。请注意,我的线程不安全的所有突变 class 都发生在单个线程上,T'
。
问题 1:我是否保证线程不安全实例的结束状态-class 安全地移植到 T' ~> T
join
?
处的线程边界
问题 2:如果我使用并行流完成此操作会怎样?例如:
Map<Long, Foo> results =
Stream
.of(new ThreadUnsafeClass())
.parallel()
.map(tuc -> {
tuc.incrementAndGet();
return new Foo(tuc);
})
.collect(
Collectors.toConcurrentMap(
foo -> foo.c.i,
Function.identity();
)
);
assert(results.get(1) != null)
我认为 ForkJoinTask.join()
具有与 Future.get()
相同的记忆效果(因为它在 join()
Javadoc 中说基本上是 get()
,但有中断和异常差异)。而 Future.get()
是 specified as:
Actions taken by the asynchronous computation represented by a Future happen-before actions subsequent to the retrieval of the result via Future.get() in another thread.
换句话说,这基本上是 "safe publication" 通过 Future
/FJT
。这意味着,执行程序线程所做的并通过 FJT
结果发布的任何内容都对 FJT.join()
用户可见。由于该示例仅在执行程序线程内分配对象并填充其字段,并且对象从执行程序返回后没有任何反应,因此我们只能看到执行程序线程产生的值。
请注意,通过 final
放置整个内容不会给它带来任何额外的好处。即使你只是做普通的野外存储,你仍然可以保证:
public static void main(String... args) throws Exception {
ExecutorService s = Executors.newCachedThreadPool();
Future<MyObject> f = s.submit(() -> new MyObject(42));
assert (f.get().x == 42); // guaranteed!
s.shutdown();
}
public class MyObject {
int x;
public MyObject(int x) { this.x = x; }
}
但是请注意,在 Stream
示例中(如果我们假设 Stream.of.parallel
和 Executor.submit
之间以及 Stream.collect
和 FJT.join
/Future.get
),你已经在调用者线程中创建了对象,然后将它传递给执行者来做一些事情。这是一个细微的差别,但仍然没有太大关系,因为我们在提交时也有 HB,这阻止了看到对象的旧状态:
public static void main(String... args) throws Exception {
ExecutorService s = Executors.newCachedThreadPool();
MyObject o = new MyObject(42);
Future<?> f = s.submit(() -> o.x++); // new --hb--> submit
f.get(); // get -->hb--> read o.x
assert (o.x == 43); // guaranteed
s.shutdown();
}
public static class MyObject {
int x;
public MyObject(int x) { this.x = x; }
}
(正式地说,那是因为来自 read(o.x)
的所有 HB 路径都经过了 store(o.x, 43)
执行线程的操作)
我有一些 class 不是线程安全的:
class ThreadUnsafeClass {
long i;
long incrementAndGet() { return ++i; }
}
(我在这里使用 long
作为字段,但我们应该将其字段视为某种线程不安全类型)。
我现在有一个 class 看起来像这样
class Foo {
final ThreadUnsafeClass c;
Foo(ThreadUnsafeClass c) {
this.c = c;
}
}
即线程不安全class是它的一个final字段。现在我要这样做:
public class JavaMM {
public static void main(String[] args) {
final ForkJoinTask<ThreadUnsafeClass> work = ForkJoinTask.adapt(() -> {
ThreadUnsafeClass t = new ThreadUnsafeClass();
t.incrementAndGet();
return new FC(t);
});
assert (work.fork().join().c.i == 1);
}
}
也就是说,从线程 T
(主),我在 T'
(fork-join-pool)上调用了一些工作,它创建并改变了我的不安全实例 class 然后 returns 结果包裹在 Foo
中。请注意,我的线程不安全的所有突变 class 都发生在单个线程上,T'
。
问题 1:我是否保证线程不安全实例的结束状态-class 安全地移植到 T' ~> T
join
?
问题 2:如果我使用并行流完成此操作会怎样?例如:
Map<Long, Foo> results =
Stream
.of(new ThreadUnsafeClass())
.parallel()
.map(tuc -> {
tuc.incrementAndGet();
return new Foo(tuc);
})
.collect(
Collectors.toConcurrentMap(
foo -> foo.c.i,
Function.identity();
)
);
assert(results.get(1) != null)
我认为 ForkJoinTask.join()
具有与 Future.get()
相同的记忆效果(因为它在 join()
Javadoc 中说基本上是 get()
,但有中断和异常差异)。而 Future.get()
是 specified as:
Actions taken by the asynchronous computation represented by a Future happen-before actions subsequent to the retrieval of the result via Future.get() in another thread.
换句话说,这基本上是 "safe publication" 通过 Future
/FJT
。这意味着,执行程序线程所做的并通过 FJT
结果发布的任何内容都对 FJT.join()
用户可见。由于该示例仅在执行程序线程内分配对象并填充其字段,并且对象从执行程序返回后没有任何反应,因此我们只能看到执行程序线程产生的值。
请注意,通过 final
放置整个内容不会给它带来任何额外的好处。即使你只是做普通的野外存储,你仍然可以保证:
public static void main(String... args) throws Exception {
ExecutorService s = Executors.newCachedThreadPool();
Future<MyObject> f = s.submit(() -> new MyObject(42));
assert (f.get().x == 42); // guaranteed!
s.shutdown();
}
public class MyObject {
int x;
public MyObject(int x) { this.x = x; }
}
但是请注意,在 Stream
示例中(如果我们假设 Stream.of.parallel
和 Executor.submit
之间以及 Stream.collect
和 FJT.join
/Future.get
),你已经在调用者线程中创建了对象,然后将它传递给执行者来做一些事情。这是一个细微的差别,但仍然没有太大关系,因为我们在提交时也有 HB,这阻止了看到对象的旧状态:
public static void main(String... args) throws Exception {
ExecutorService s = Executors.newCachedThreadPool();
MyObject o = new MyObject(42);
Future<?> f = s.submit(() -> o.x++); // new --hb--> submit
f.get(); // get -->hb--> read o.x
assert (o.x == 43); // guaranteed
s.shutdown();
}
public static class MyObject {
int x;
public MyObject(int x) { this.x = x; }
}
(正式地说,那是因为来自 read(o.x)
的所有 HB 路径都经过了 store(o.x, 43)
执行线程的操作)