ConcurrentBitSet 中的竞争条件

Race condition in ConcurrentBitSet

我正在尝试在 java 中创建并发位集,它允许大小扩展(与固定长度相反,with 非常简单)。下面是核心部分class(其他方法暂时不重要)

public class ConcurrentBitSet {

static final int CELL = 32;
static final int MASK = CELL - 1;
final AtomicReference<AtomicIntegerArray> aiaRef;

public ConcurrentBitSet(int initialBitSize) {
    aiaRef = new AtomicReference<>(new AtomicIntegerArray(1 + ((initialBitSize - 1) / CELL)));
}

public boolean get(int bit) {
    int cell = bit / CELL;
    if (cell >= aiaRef.get().length()) {
        return false;
    }
    int mask = 1 << (bit & MASK);
    return (aiaRef.get().get(cell) & mask) != 0;
}

public void set(int bit) {
    int cell = bit / CELL;
    int mask = 1 << (bit & MASK);
    while (true) {
        AtomicIntegerArray old = aiaRef.get();
        AtomicIntegerArray v = extend(old, cell);
        v.getAndAccumulate(cell, mask, (prev, m) -> prev | m);
        if (aiaRef.compareAndSet(old, v)) {
            break;
        }
    }
}

private AtomicIntegerArray extend(AtomicIntegerArray old, int cell) {
    AtomicIntegerArray v = old;
    if (cell >= v.length()) {
        v = new AtomicIntegerArray(cell + 1);
        for (int i = 0; i < old.length(); i++) {
            v.set(i, old.get(i));
        }
    }
    return v;
}

public String toString() {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < aiaRef.get().length(); i++) {
        for (int b = 0; b < CELL; b++) {
            sb.append(get(i * CELL + b) ? '1' : '0');
        }
    }
    return sb.toString();
}

}

不幸的是,这里似乎存在竞争条件。

这里是示例测试代码,它每次都失败了几个位——它应该打印所有的 1,直到第 300 位,但那里几乎没有随机零,每次都在不同的地方。我得到的一台 PC 很少,另一台在行的 odd/even 个位置上有 8-10 个零。

    final ConcurrentBitSet cbs = new ConcurrentBitSet(10);
    CountDownLatch latch = new CountDownLatch(1);
    new Thread() {
        public void run() {
            try {
                latch.await();
                for (int i = 0; i < 300; i += 2) {
                    cbs.set(i);
                }
            } catch (InterruptedException e) {

            }
        };
    }.start();
    new Thread() {
        public void run() {
            try {
                latch.await();
                for (int i = 0; i < 300; i += 2) {
                    cbs.set(i + 1);
                }
            } catch (InterruptedException e) {
            }
        };
    }.start();
    latch.countDown();
    Thread.sleep(1000);
    System.out.println(cbs.toString());

我得到的例子 11111111111111111111111111111111111111111111111111111101111111111111111111111111011111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111101111111111111111111111111111111111111111111111111111111111111011111111111111111111111111111111111111111111111111100000000000000000000 11111111111111111111111111111111111111110111111111111111111111110101111111111111111111110101011111111111111111111111111111111111010101111111111111111111111111111111111101010111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111100000000000000000000

很难调试(因为任何复杂的东西都会使竞争条件消失),但看起来在两个线程同时尝试扩展数组大小的时候,一个线程失败并必须重试while 循环在循环的下一部分以来自 aiaRef.get() 的损坏数据结束 - 它已经访问过的部分(如果他们试图扩展它,其他线程也是如此)最终在内部有一些零。

有人知道错误在哪里吗?

问题是 aiaRef.compareAndSet() 只是防止并发 替换 ,因为 AtomicReference 唯一的工作是保护其引用的原子性.如果在我们重建数组时引用的对象同时被 modifiedcompareAndSet() 将成功,因为它正在将相同的引用与自身进行比较,但它可能遗漏了一些修改。