创建事件调度线程安全信号量

Creating an Event Dispatch Thread safe semaphore

我一直在尝试制作一个二进制信号量,它能够安全地阻止事件分派线程 (EDT) 上方法 运行 的执行,而不会实际阻止线程处理更多事件。乍一看这似乎是不可能的,但是 Java 有一些与此相关的内置功能,但我不能完全让它工作。

用例

目前,如果您从 EDT 显示模态摆动对话框,它似乎会阻止 EDT(因为显示模态对话框的方法在对话框关闭之前不会继续到下一行),但实际上有一些幕后魔法使 EDT 进入一个新的事件循环,它将继续调度事件直到模式对话框关闭。

我的团队目前有一些应用程序正在从 swing 非常缓慢地迁移到 JavaFX(有点棘手的过渡),我希望能够显示来自 AWT 事件的模态 JavaFX 对话框以可以显示 swing 模态对话框的相同方式调度线程。似乎拥有某种 EDT 安全信号量可以满足这个用例,并且可能会在未来的其他用途中派上用场。

方法

java.awt.EventQueue.createSecondaryLoop() 是一种创建 SecondaryLoop 对象的方法,您可以使用该对象启动新的事件处理循环。当您调用 SecondaryLoop.enter() 时,调用将在处理新的事件循环时阻塞(请注意 call 阻塞,但 thread未被阻止,因为它在事件处理循环中继续)。新的事件循环将继续,直到您调用 SecondaryLoop.exit()(这不完全正确,请参阅我的 )。

所以我创建了一个信号量,其中阻塞调用 acquire 导致等待普通线程的锁存器,或者进入 EDT 的辅助循环。每个对 acquire 的阻塞调用还会添加一个解除阻塞操作,以便在释放信号量时调用(对于普通线程,它只是递减锁存器,对于 EDT,它退出二级循环)。

这是我的代码:


import java.awt.EventQueue;
import java.awt.SecondaryLoop;
import java.awt.Toolkit;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;

@SuppressWarnings("serial")
public class EventDispatchThreadSafeBinarySemaphore extends Semaphore{

    /** Operations used to unblock threads when a semaphore is released.
     * Must be a stack because secondary loops have to be exited in the
     * reverse of the order in which they were entered in order to unblock
     * the execution of the method that entered the loop.
     */
    private Stack<Runnable> releaseOperations = new Stack<>();

    private boolean semaphoreAlreadyAcquired = false;


    public EventDispatchThreadSafeBinarySemaphore() {
        super(0);
    }

    @Override
    public boolean isFair() {
        return false;
    }

    @Override
    public void acquire() throws InterruptedException {

        Runnable blockingOperation = () -> {};

        synchronized(this) {
            if(semaphoreAlreadyAcquired) {

                //We didn't acquire the semaphore, need to set up an operation to execute
                //while we're waiting on the semaphore and an operation for another thread
                //to execute in order to unblock us when the semaphore becomes available

                if(EventQueue.isDispatchThread()) {

                    //For the EDT, we don't want to actually block, rather we'll enter a new loop that will continue
                    //processing AWT events.
                    SecondaryLoop temporaryAwtLoop = Toolkit.getDefaultToolkit().getSystemEventQueue().createSecondaryLoop();

                    releaseOperations.add(() -> temporaryAwtLoop.exit());

                    blockingOperation = () -> {

                        if(!temporaryAwtLoop.enter()) {
                            //I don't think we'll run into this, but I'm leaving this here for now for debug purposes
                            System.err.println("Failed to enter event loop");
                        }
                    };
                }
                else {

                    //Non-dispatch thread is a little simpler, we'll just wait on a latch
                    CountDownLatch blockedLatch = new CountDownLatch(1);
                    releaseOperations.add(() -> blockedLatch.countDown());
                    blockingOperation = () -> {
                        try {
                            blockedLatch.await();
                        } catch (InterruptedException e) {
                            //I'll worry about handling this better once I have the basics figured out
                            e.printStackTrace();
                        }
                    };
                }
            }
            else {
                semaphoreAlreadyAcquired = true;
            }
        }

        //This part must be executed outside of the synchronized block so that we don't block
        //the EDT if it tries to acquire the semaphore while this statement is blocked
        blockingOperation.run();

    }

    @Override
    public void release() {
        synchronized(this) {
            if(releaseOperations.size() > 0) {
                //Release the last blocked thread
                releaseOperations.pop().run();
            }
            else {
                semaphoreAlreadyAcquired = false;
            }
        }
    }

}

这是我的相关 JUnit 测试代码(我为较大的尺寸道歉,这是迄今为止我能够提出的最小的最小可验证示例):

public class TestEventDispatchThreadSafeBinarySemaphore {

    private static EventDispatchThreadSafeBinarySemaphore semaphore;
        //See 
        //for why we need this timer
        private static Timer timer = new Timer(500, null);
        @BeforeClass
    public static void setupClass() {
        timer.start();
    }

    @Before
    public void setup() {
        semaphore = new EventDispatchThreadSafeBinarySemaphore();
    }
        @AfterClass
    public static void cleanupClass() {
        timer.stop();
    }

        //This test passes just fine
        @Test(timeout = 1000)
    public void testBlockingAcquireReleaseOnEDT() throws InterruptedException {

        semaphore.acquire();

        CountDownLatch edtCodeStarted = new CountDownLatch(1);
        CountDownLatch edtCodeFinished = new CountDownLatch(1);

        SwingUtilities.invokeLater(() -> {
            //One countdown to indicate that this has begun running
            edtCodeStarted.countDown();
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            //This countdown indicates that it has finished running
            edtCodeFinished.countDown();

        });

        //Ensure that the code on the EDT has started
        edtCodeStarted.await();

        assertEquals("Code on original AWT event thread should still be blocked", 1, edtCodeFinished.getCount());

        //Ensure that things can still run on the EDT
        CountDownLatch edtActiveCheckingLatch = new CountDownLatch(1);
        SwingUtilities.invokeLater(() -> edtActiveCheckingLatch.countDown());

        //If we get past this line, then we know that the EDT is live even though the 
        //code in the invokeLater call is blocked
        edtActiveCheckingLatch.await();

        assertEquals("Code on original AWT event thread should still be blocked", 1, edtCodeFinished.getCount());

        semaphore.release();

        //If we get past this line, then the code on the EDT got past the semaphore
        edtCodeFinished.await();
    }

        //This test fails intermittently, but so far only after the previous test was run first
    @Test(timeout = 10000)
    public void testConcurrentAcquiresOnEDT() throws InterruptedException {

        int numThreads =100;

        CountDownLatch doneLatch = new CountDownLatch(numThreads);

        try {
            semaphore.acquire();

            //Queue up a bunch of threads to acquire and release the semaphore
            //as soon as it becomes available
            IntStream.range(0, numThreads)
                    .parallel()
                    .forEach((threadNumber) -> 
                        SwingUtilities.invokeLater(() -> {
                            try {
                                semaphore.acquire();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            finally {
                                semaphore.release();
                                //Count down the latch to indicate that the thread terminated
                                doneLatch.countDown();
                            }
                        })
                    );

            semaphore.release();

            doneLatch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

问题

testConcurrentAcquiresOnEDT有时会通过有时会失败。我相信我知道为什么。我深入研究了 Java 源代码,在 WaitDispatchSupportSecondaryLoop 的具体实现)中,循环基本上继续调度事件,直到清除名为 keepBlockingEDT 的标志。它将在事件之间进行检查。当我调用 exit 时,它将清除该标志并发送一个事件以唤醒事件队列,以防它正在等待更多事件。但是,它不会导致 enter() 方法立即退出(而且我不认为有可能)。

所以这是死锁的结果:

我已经为此工作了几天,我提出的每一个想法都是死胡同。

我相信我有一个可能的部分解决方案,就是不允许一个以上的线程一次阻塞在信号量上(如果另一个线程试图获取它,我将抛出一个 IllegalStateException ).如果它们每个都使用自己的信号量,我仍然可以进行多个辅助循环,但每个信号量最多会创建 1 个辅助循环。我认为这会起作用,并且它会很好地满足我最可能的用例(因为大多数情况下我只想显示来自事件线程的单个 JavaFX 模式对话框)。我只是想知道是否还有其他人有其他想法,因为我觉得我快要做出一些很酷的东西了,但就是不太行。

如果您有任何想法,请告诉我。 "I'm pretty sure this is impossible and here's why..." 也是一个可以接受的答案。

使用 Semaphore 很可能不是正确的方法。你想要的是进入嵌套事件循环,而不是使用阻塞机制。从阅读 API 来看,你似乎也把事情复杂化了。同样,您只需在一个 UI 线程上进入嵌套事件循环,然后在另一个 UI 线程完成其工作后退出该循环。我相信以下符合您的要求:

import java.awt.EventQueue;
import java.awt.SecondaryLoop;
import java.awt.Toolkit;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javafx.application.Platform;
import javax.swing.SwingUtilities;

public class Foo {

    public static <T> T getOnFxAndWaitOnEdt(Supplier<? extends T> supplier) {
        Objects.requireNonNull(supplier, "supplier");
        if (!EventQueue.isDispatchThread()) {
            throw new IllegalStateException("current thread != EDT");
        }

        final SecondaryLoop loop = Toolkit.getDefaultToolkit()
                .getSystemEventQueue()
                .createSecondaryLoop();
        final AtomicReference<T> valueRef = new AtomicReference<>();

        Platform.runLater(() -> {
            valueRef.set(supplier.get());
            SwingUtilities.invokeLater(loop::exit);
        });
        loop.enter();

        return valueRef.get();
    }

    public static <T> T getOnEdtAndWaitOnFx(Supplier<? extends T> supplier) {
        Objects.requireNonNull(supplier, "supplier");
        if (!Platform.isFxApplicationThread()) {
            throw new IllegalStateException(
                    "current thread != JavaFX Application Thread");
        }

        final Object key = new Object();
        final AtomicReference<T> valueRef = new AtomicReference<>();

        SwingUtilities.invokeLater(() -> {
            valueRef.set(supplier.get());
            Platform.runLater(() -> Platform.exitNestedEventLoop(key, null));
        });
        Platform.enterNestedEventLoop(key);

        return valueRef.get();
    }

}

JavaFX 9 中添加了 Platform#enterNestedEventLoopPlatform#exitNestedEventLoop 方法,尽管 JavaFX 8 中有等效的内部方法。使用 AtomicReference 的原因是局部变量必须是 final 或在 lambda 表达式中使用时有效地最终。但是,由于通知单独线程的方式,我不认为 AtomicReference#get()#set(T) 方法提供的波动性语义是严格需要的,但我已经使用了这些方法以防万一

下面是一个使用上面的示例显示来自 Event Dispatch Thread 的模态 JavaFX 对话框的示例:

Optional<T> optional = Foo.getOnFxAndWaitOnEdt(() -> {
    Dialog<T> dialog = new Dialog<>();
    // configure dialog...
    return dialog.showAndWait();
});

上述实用方法用于从 事件调度线程 JavaFX 应用程序线程 的通信,反之亦然。这就是为什么必须进入嵌套事件循环的原因,否则 UI 线程之一将不得不阻塞并冻结关联的 UI。如果您在非 UI 线程上并且需要 运行 在等待结果的同时在 UI 线程上执行操作,则解决方案要简单得多:

// Run on EDT
T result = CompletableFuture.supplyAysnc(/*Supplier*/, SwingUtilities::invokeLater).join();

// Run on FX thread
T result = CompletableFuture.supplyAsync(/*Supplier*/, Platform::runLater).join();

join() 的调用将阻塞调用线程,因此请确保不要从任何 UI 线程调用该方法。