处理与循环障碍相关的异常的更好方法

Better ways to handle exceptions related to Cyclic Barriers

我正在尝试将一些业务案例映射到循环障碍的使用。假设我们正在进行促销优惠,并且只有 3 位客户可以获得促销优惠。其他人将不会获得优惠。

为了映射这个场景,我使用了循环障碍。即使代码可以正常工作,我也不确定如何处理某些客户无法获得报价的情况。现在,我尝试将 await() API 与超时值一起使用,这样我就可以捕获 TimeoutException 并让客户知道他无法使用促销优惠.这导致另一个等待线程 BarrierBrokenException

我想知道,我们如何优雅地处理这些情况,以便选定的客户可以享受促销优惠,而那些无法遵循不同代码路径的客户。

我的代码-

public class CyclicBarrierExample {

 public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
    Thread[] threads = new Thread[5];
    CyclicBarrier barrier = new CyclicBarrier(3, ()->{System.out.println("Barrier limit of 3 reached. 3 threads will get the promotional offer!");});
    Runnable nr = new PromotionRunnable(barrier);

    int i = 0;
    for (Thread t : threads) {
        t = new Thread(nr, "Thread " + ++i);
        t.start();
    }
    System.out.println("main thread has completed");
 }

 private static class PromotionRunnable implements Runnable {
    private final CyclicBarrier barrier;

    public PromotionRunnable(final CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    /*
     * As per the doc, BrokenBarrierException is thrown when another thread timed out while the current thread was waiting.
     * This explains why we are able to see both Timeout and Broken Barrier Exceptions.
     */
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " trying to get the promotional offer!");
        try {
            barrier.await(2000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        } catch (BrokenBarrierException e) {
            System.out.println(Thread.currentThread().getName() + " could not get the promotional offer, due to barrier exception");
            return;
        } catch (TimeoutException e) {
            System.out.println(Thread.currentThread().getName() + " could not get the promotional offer, due to timeout exception");
            return;
        }
        System.out.println(Thread.currentThread().getName() + " got the promotional offer!");
    }
 }
}

其中一次运行的输出 -

您使用的 CyclicBarrier 不正确。用于多个线程同步。

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

您可能需要使用 Semaphore 虽然我不确定您需要什么。

A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Each release() adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.

您可能正在寻找 AtomicInteger

An int value that may be updated atomically. See the java.util.concurrent.atomic package specification for description of the properties of atomic variables. An AtomicInteger is used in applications such as atomically incremented counters, and cannot be used as a replacement for an Integer.

A CyclicBarrier 只会在 3 位客户尝试访问该优惠时跳闸。

因此,如果只有 1 位客户尝试访问它,它将阻塞,直到另外 2 位客户也尝试访问它!一旦屏障跳闸,它就会被简单地重置,机制就会重新开始。您可以观察是否创建了 6 个以上的线程而不是 5 个。

所以CyclicBarrier似乎不​​是你要找的。

您可能想计算已经访问该优惠的客户数量并拒绝新客户:

private static class PromotionBarrier {
    private final AtomicBoolean hasAccess = new AtomicBoolean(false);
    private final AtomicLong counter = new AtomicLong(0);
    private final long maxCustomers = 3;
    public boolean hasAccess() {
        if(hasAccess.get()) {
            long value = counter.incrementAndGet();
            if(value <= maxCustomers) {
                return true;
            } else {
                hasAccess.set(false);
                return false;
            }
        }
        return false; 
    }
}

private static class PromotionRunnable implements Runnable {
    private final PromotionBarrier promotionBarrier;

    public PromotionRunnable(final PromotionBarrier promotionBarrier) {
        this.promotionBarrier = barrier;
    }

    @Override
    public void run() {
        if(promotionBarrier.hasAccess()) {
            // Yoohoo I got it!
        } else {
            // Rha I am too late!!
        }
    }

CyclicBarrier 适用于您有多个线程,并且您希望它们同时开始做某事的情况。如果屏障是为N个线程设置的,那么它会让前N-1个线程等待,直到第N个线程到达,然后再让它们全部走。

这可能不是您想要的。您希望前 3 个线程获得奖品,而其余线程空手而归。 CyclicBarrier 是关于让线程 等待 做某事,但没有什么是您希望线程等待的。

Semaphore 也就是让线程等待某事。

我喜欢@OldCurmudgeon 关于使用 AtomicInteger 的建议。

设置一个AtomicInteger等于奖品的数量,然后让每个线程调用ai.decrementAndGet()。如果结果 >= 0,线程可以领取奖品。如果结果 < 0,那么很抱歉,没有奖品了。