并发收集先发生关系

Concurrent collection happen-before relationship

我现在正在学习并发性,并且我尝试编写一个程序来演示使用并发集合时发生的先行关系。 如java.concurrent包中所述:

The methods of all classes in java.util.concurrent and its subpackages extend these guarantees to higher-level synchronization. In particular: Actions in a thread prior to placing an object into any concurrent collection happen-before actions subsequent to the access or removal of that element from the collection in another thread.

我写了下class:

import java.util.Deque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;

public class HappensBefore {
    static int      checks      = 0;
    static int      shouldNotHappen   = 0;

    static Deque<Integer> syncList    = new LinkedBlockingDeque<>();
    public static boolean varToHappenBefore = false; //this var must be false when new element added to collection

    static AtomicBoolean  flag        = new AtomicBoolean();

    static class SyncTask implements Runnable {

    int       localTemp = -1;
    private final Thread t   = new Thread(new Counter());

    @Override
    public void run() {
        t.start();
        while (syncList.isEmpty()) { //just skip
        }

        while (true) {
        if (!Thread.interrupted()) {
            if (flag.get()) {
            int r = syncList.peekLast();
            if (r != localTemp) {
                if (varToHappenBefore) {
                shouldNotHappen++;
                }
                varToHappenBefore = true;
                localTemp = r;
                checks++;
            }
            flag.set(false);
            }
        } else {
            t.interrupt();
            break;
        }
        }
    }
    }

    static class Counter implements Runnable {
    int ctr = 0;

    @Override
    public void run() {
        while (!Thread.interrupted()) {

        if (!flag.get()) {
            flag.set(true);
            varToHappenBefore = false;
            syncList.add(ctr++);
        }
        }
    }
    }


    public static void main(String[] args) throws InterruptedException {
    SyncTask st = new SyncTask();
    Thread s = new Thread(st);
    s.start();
    Thread.sleep(10000);//runtime ms

    s.interrupt();
    // s1.interrupt();
    System.out.println("Elems times added: " + checks);
    System.out
        .println("Happens-before violated times: " + shouldNotHappen);
    }
}

我做的是启动线程 1,线程 1 启动线程 2。 Thread1 检查最初设置为 false 的 public boolean varToHappenBefore。当 thread1 从集合中保存新元素时,它将此布尔值设置为 true。在下一个新元素上。如果此布尔值仍然为真,则在发生违规之前发生,并且 shouldNotHappen 递增。

Thread1 检查并发集合是否有新元素,如果有,将其保存在临时变量中并增加总计数器 checks。 然后它切换原子布尔值让 thread2 添加新元素。在添加新元素之前,varToHappenBefore 设置为 false。由于原子布尔标志,thread2 在 thread1 之前没有 运行 代码。但是在添加元素和检查 varToHappenBefore 之前切换线程 2 中的标志,因为这两个操作(elem add 和 boolean toggle)是通过 atomic boolean 同步的。我确保 thread2 运行s 在 thread1 运行s 之后仅执行一次。如果 varToHappenBefore 发生在添加 elem 之前。到线程 2 中的集合,然后在线程 1 中读取(线程 1 检查 varToHappenBefore 仅当从集合中读取新元素时),然后 varToHappenBefore 之后必须保持为 0程序结束。 但我得到下一个结果:

元素添加次数:~10 000 000

happens-before 违规次数:0-10

可能我做错了什么,多线程是微妙而复杂的。希望得到您的帮助。

编辑: 我需要在 thread1 setFlag(false) 为真之后和获取 elem 之前逃避这种情况。从集合中,因为 thread2 然后可以在从集合中获取元素和在线程 1 中设置 varToHappenBefore = true; 之间工作。如果我进行 AtomicBoolean.compareAndSet() 检查块,那么我每 8mils 得到 80k 失败。而且它是可预测和明确的。但是当我没有为 thread2 做这样的 window 在从集合读取和设置布尔值之间添加额外的元素时,当布尔值为真并且新元素出现时,我仍然很少有顺序读取。

即使发现列表尚未更改,您的 SyncTask 也会清除 flag。因此,您可能会错过列表添加事件。这使您的假设不正确:

  1. 在丢失事件后的下一轮,SyncTask 发现列表已更改,它假定 Counter 已完成其任务,检查(成功)并重新装备 varToHappenBefore。但是 Counter 实际上将元素添加到列表 after
  2. 下一轮SyncTask发现列表又变了,假设Counter已经完成任务,检查varToHappenBefore发现没有被清除。这是因为Counter还没有清除

只需将 flag.set(false); 移动到 if (r != localTemp) 块中,一切都会起作用。

最干净的演示会非常简单。让线程 A 创建一些可变 class 的实例,更新它,然后 put(...) 将它添加到队列中。然后,让线程 B 从队列中移除实例,并验证对象的状态是否符合预期。

您的示例分析和修复起来有点复杂。但是,如果您这样做只是为了了解并发收集情况下的 happens-before 契约,那么试试这个程序 -

public class HappensBeforeTest {

private static boolean producerStopped;

private static final Queue<String> queue = new LinkedBlockingQueue<>();
//private static final Queue<String> queue = new LinkedList<>();

public static void main(String[] args) {
    new Thread(new Producer()).start();
    new Thread(new Consumer()).start();
}

private static class Producer implements Runnable {

    public void run() {
        int count = 0;
        while (count++ < 10) {
            producerStopped = (count == 10);
            queue.add(String.valueOf(count));
        }
        System.out.println("Producer finished");
    }
}

private static class Consumer implements Runnable {

    public void run() {
        while (!producerStopped) {
            queue.poll();
        }
        System.out.println("Consumer finished");
    }
}

}

queueLinkedBlockingQueue 时,线程 ProducerConsumer 都完成。

但是,当 queueLinkedList 时,有时 Consumer 不会完成并保留 运行,因为 producerStopped 标志仍然可见,对于 Consumer 即使 Producer 已将其更新为 true。