并发收集先发生关系
Concurrent collection happen-before relationship
我现在正在学习并发性,并且我尝试编写一个程序来演示使用并发集合时发生的先行关系。
如java.concurrent包中所述:
The methods of all classes in java.util.concurrent and its subpackages
extend these guarantees to higher-level synchronization. In
particular: Actions in a thread prior to placing an object into any
concurrent collection happen-before actions subsequent to the access
or removal of that element from the collection in another thread.
我写了下class:
import java.util.Deque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
public class HappensBefore {
static int checks = 0;
static int shouldNotHappen = 0;
static Deque<Integer> syncList = new LinkedBlockingDeque<>();
public static boolean varToHappenBefore = false; //this var must be false when new element added to collection
static AtomicBoolean flag = new AtomicBoolean();
static class SyncTask implements Runnable {
int localTemp = -1;
private final Thread t = new Thread(new Counter());
@Override
public void run() {
t.start();
while (syncList.isEmpty()) { //just skip
}
while (true) {
if (!Thread.interrupted()) {
if (flag.get()) {
int r = syncList.peekLast();
if (r != localTemp) {
if (varToHappenBefore) {
shouldNotHappen++;
}
varToHappenBefore = true;
localTemp = r;
checks++;
}
flag.set(false);
}
} else {
t.interrupt();
break;
}
}
}
}
static class Counter implements Runnable {
int ctr = 0;
@Override
public void run() {
while (!Thread.interrupted()) {
if (!flag.get()) {
flag.set(true);
varToHappenBefore = false;
syncList.add(ctr++);
}
}
}
}
public static void main(String[] args) throws InterruptedException {
SyncTask st = new SyncTask();
Thread s = new Thread(st);
s.start();
Thread.sleep(10000);//runtime ms
s.interrupt();
// s1.interrupt();
System.out.println("Elems times added: " + checks);
System.out
.println("Happens-before violated times: " + shouldNotHappen);
}
}
我做的是启动线程 1,线程 1 启动线程 2。
Thread1 检查最初设置为 false 的 public boolean varToHappenBefore。当 thread1 从集合中保存新元素时,它将此布尔值设置为 true。在下一个新元素上。如果此布尔值仍然为真,则在发生违规之前发生,并且 shouldNotHappen 递增。
Thread1 检查并发集合是否有新元素,如果有,将其保存在临时变量中并增加总计数器 checks。
然后它切换原子布尔值让 thread2 添加新元素。在添加新元素之前,varToHappenBefore 设置为 false。由于原子布尔标志,thread2 在 thread1 之前没有 运行 代码。但是在添加元素和检查 varToHappenBefore 之前切换线程 2 中的标志,因为这两个操作(elem add 和 boolean toggle)是通过 atomic boolean 同步的。我确保 thread2 运行s 在 thread1 运行s 之后仅执行一次。如果 varToHappenBefore 发生在添加 elem 之前。到线程 2 中的集合,然后在线程 1 中读取(线程 1 检查 varToHappenBefore 仅当从集合中读取新元素时),然后 varToHappenBefore 之后必须保持为 0程序结束。
但我得到下一个结果:
元素添加次数:~10 000 000
happens-before 违规次数:0-10
可能我做错了什么,多线程是微妙而复杂的。希望得到您的帮助。
编辑:
我需要在 thread1 setFlag(false)
为真之后和获取 elem 之前逃避这种情况。从集合中,因为 thread2 然后可以在从集合中获取元素和在线程 1 中设置 varToHappenBefore = true;
之间工作。如果我进行 AtomicBoolean.compareAndSet()
检查块,那么我每 8mils 得到 80k 失败。而且它是可预测和明确的。但是当我没有为 thread2 做这样的 window 在从集合读取和设置布尔值之间添加额外的元素时,当布尔值为真并且新元素出现时,我仍然很少有顺序读取。
即使发现列表尚未更改,您的 SyncTask
也会清除 flag
。因此,您可能会错过列表添加事件。这使您的假设不正确:
- 在丢失事件后的下一轮,
SyncTask
发现列表已更改,它假定 Counter
已完成其任务,检查(成功)并重新装备 varToHappenBefore
。但是 Counter
实际上将元素添加到列表 after
- 下一轮
SyncTask
发现列表又变了,假设Counter
已经完成任务,检查varToHappenBefore
发现没有被清除。这是因为Counter
还没有清除
只需将 flag.set(false);
移动到 if (r != localTemp)
块中,一切都会起作用。
最干净的演示会非常简单。让线程 A 创建一些可变 class 的实例,更新它,然后 put(...)
将它添加到队列中。然后,让线程 B 从队列中移除实例,并验证对象的状态是否符合预期。
您的示例分析和修复起来有点复杂。但是,如果您这样做只是为了了解并发收集情况下的 happens-before 契约,那么试试这个程序 -
public class HappensBeforeTest {
private static boolean producerStopped;
private static final Queue<String> queue = new LinkedBlockingQueue<>();
//private static final Queue<String> queue = new LinkedList<>();
public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
private static class Producer implements Runnable {
public void run() {
int count = 0;
while (count++ < 10) {
producerStopped = (count == 10);
queue.add(String.valueOf(count));
}
System.out.println("Producer finished");
}
}
private static class Consumer implements Runnable {
public void run() {
while (!producerStopped) {
queue.poll();
}
System.out.println("Consumer finished");
}
}
}
当 queue
为 LinkedBlockingQueue
时,线程 Producer
和 Consumer
都完成。
但是,当 queue
为 LinkedList
时,有时 Consumer
不会完成并保留 运行,因为 producerStopped
标志仍然可见,对于 Consumer
即使 Producer
已将其更新为 true。
我现在正在学习并发性,并且我尝试编写一个程序来演示使用并发集合时发生的先行关系。 如java.concurrent包中所述:
The methods of all classes in java.util.concurrent and its subpackages extend these guarantees to higher-level synchronization. In particular: Actions in a thread prior to placing an object into any concurrent collection happen-before actions subsequent to the access or removal of that element from the collection in another thread.
我写了下class:
import java.util.Deque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
public class HappensBefore {
static int checks = 0;
static int shouldNotHappen = 0;
static Deque<Integer> syncList = new LinkedBlockingDeque<>();
public static boolean varToHappenBefore = false; //this var must be false when new element added to collection
static AtomicBoolean flag = new AtomicBoolean();
static class SyncTask implements Runnable {
int localTemp = -1;
private final Thread t = new Thread(new Counter());
@Override
public void run() {
t.start();
while (syncList.isEmpty()) { //just skip
}
while (true) {
if (!Thread.interrupted()) {
if (flag.get()) {
int r = syncList.peekLast();
if (r != localTemp) {
if (varToHappenBefore) {
shouldNotHappen++;
}
varToHappenBefore = true;
localTemp = r;
checks++;
}
flag.set(false);
}
} else {
t.interrupt();
break;
}
}
}
}
static class Counter implements Runnable {
int ctr = 0;
@Override
public void run() {
while (!Thread.interrupted()) {
if (!flag.get()) {
flag.set(true);
varToHappenBefore = false;
syncList.add(ctr++);
}
}
}
}
public static void main(String[] args) throws InterruptedException {
SyncTask st = new SyncTask();
Thread s = new Thread(st);
s.start();
Thread.sleep(10000);//runtime ms
s.interrupt();
// s1.interrupt();
System.out.println("Elems times added: " + checks);
System.out
.println("Happens-before violated times: " + shouldNotHappen);
}
}
我做的是启动线程 1,线程 1 启动线程 2。 Thread1 检查最初设置为 false 的 public boolean varToHappenBefore。当 thread1 从集合中保存新元素时,它将此布尔值设置为 true。在下一个新元素上。如果此布尔值仍然为真,则在发生违规之前发生,并且 shouldNotHappen 递增。
Thread1 检查并发集合是否有新元素,如果有,将其保存在临时变量中并增加总计数器 checks。 然后它切换原子布尔值让 thread2 添加新元素。在添加新元素之前,varToHappenBefore 设置为 false。由于原子布尔标志,thread2 在 thread1 之前没有 运行 代码。但是在添加元素和检查 varToHappenBefore 之前切换线程 2 中的标志,因为这两个操作(elem add 和 boolean toggle)是通过 atomic boolean 同步的。我确保 thread2 运行s 在 thread1 运行s 之后仅执行一次。如果 varToHappenBefore 发生在添加 elem 之前。到线程 2 中的集合,然后在线程 1 中读取(线程 1 检查 varToHappenBefore 仅当从集合中读取新元素时),然后 varToHappenBefore 之后必须保持为 0程序结束。 但我得到下一个结果:
元素添加次数:~10 000 000
happens-before 违规次数:0-10
可能我做错了什么,多线程是微妙而复杂的。希望得到您的帮助。
编辑:
我需要在 thread1 setFlag(false)
为真之后和获取 elem 之前逃避这种情况。从集合中,因为 thread2 然后可以在从集合中获取元素和在线程 1 中设置 varToHappenBefore = true;
之间工作。如果我进行 AtomicBoolean.compareAndSet()
检查块,那么我每 8mils 得到 80k 失败。而且它是可预测和明确的。但是当我没有为 thread2 做这样的 window 在从集合读取和设置布尔值之间添加额外的元素时,当布尔值为真并且新元素出现时,我仍然很少有顺序读取。
即使发现列表尚未更改,您的 SyncTask
也会清除 flag
。因此,您可能会错过列表添加事件。这使您的假设不正确:
- 在丢失事件后的下一轮,
SyncTask
发现列表已更改,它假定Counter
已完成其任务,检查(成功)并重新装备varToHappenBefore
。但是Counter
实际上将元素添加到列表 after - 下一轮
SyncTask
发现列表又变了,假设Counter
已经完成任务,检查varToHappenBefore
发现没有被清除。这是因为Counter
还没有清除
只需将 flag.set(false);
移动到 if (r != localTemp)
块中,一切都会起作用。
最干净的演示会非常简单。让线程 A 创建一些可变 class 的实例,更新它,然后 put(...)
将它添加到队列中。然后,让线程 B 从队列中移除实例,并验证对象的状态是否符合预期。
您的示例分析和修复起来有点复杂。但是,如果您这样做只是为了了解并发收集情况下的 happens-before 契约,那么试试这个程序 -
public class HappensBeforeTest {
private static boolean producerStopped;
private static final Queue<String> queue = new LinkedBlockingQueue<>();
//private static final Queue<String> queue = new LinkedList<>();
public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
private static class Producer implements Runnable {
public void run() {
int count = 0;
while (count++ < 10) {
producerStopped = (count == 10);
queue.add(String.valueOf(count));
}
System.out.println("Producer finished");
}
}
private static class Consumer implements Runnable {
public void run() {
while (!producerStopped) {
queue.poll();
}
System.out.println("Consumer finished");
}
}
}
当 queue
为 LinkedBlockingQueue
时,线程 Producer
和 Consumer
都完成。
但是,当 queue
为 LinkedList
时,有时 Consumer
不会完成并保留 运行,因为 producerStopped
标志仍然可见,对于 Consumer
即使 Producer
已将其更新为 true。