必须手动中断的单元测试异步计算

Unit testing asynchronous computation that has to be interrupted manually

我有一个 class 可以异步记录眼动数据。有方法startstop记录过程。数据收集在一个集合中,只有在记录线程完成其工作后才能访问该集合。它基本上封装了所有线程和同步,因此我的库的用户不必这样做。

大大缩短的代码(省略了泛型和错误处理):

public class Recorder {
  private Collection accumulatorCollection;
  private Thread recordingThread;

  private class RecordingRunnable implements Runnable {
    ...

    public void run() {
      while(!Thread.currentThread().isInterrupted()) {
        // fetch data and collect it in the accumulator
        synchronized(acc) { acc.add(Eyetracker.getData()) }
      }
    }
  }

  public void start() {
    accumulatorCollection = new Collection();
    recordingThread = new Thread(new RecordingRunnable(accumulatorCollection));
    recordingThread.start();
  }

  public void stop() {
    recordingThread.interrupt();
  }

  public void getData() {
    try {
      recordingThread.join(2000);
      if(recordingThread.isAlive()) { throw Exception(); }
    }
    catch(InterruptedException e) { ... }

    synchronized(accumulatorCollection) { return accumulatorCollection; }
  }
}

用法很简单:

recorder.start();
...
recorder.stop();
Collection data = recorder.getData();

我的问题是如何测试它。目前我是这样做的:

recorder.start();
Thread.sleep(50);
recorder.stop();
Collection data = recorder.getData();
assert(stuff);

这有效,但它是不确定的,并且会大大降低测试套件的速度(我将这些测试标记为集成测试,因此它们必须单独 运行 才能避免这个问题)。

有没有更好的方法?

有更好的方法使用 CountDownLatch

测试的不确定性部分源于您未考虑的两个时间变量:

  • 创建和启动线程需要时间,并且线程可能在 Thread.start() returns 时尚未开始执行可运行对象(可运行对象将被执行,但可能会稍晚一些)。
  • stop/interrupt 会中断 Runnable 中的 while 循环,但不会立即中断,可能会稍晚一些。

这是 CountDownLatch 的用武之地:它为您提供有关另一个线程正在执行的位置的准确信息。例如。让第一个线程在闩锁上等待,而第二个 "counts down" 闩锁作为 runnable 中的最后一条语句,现在第一个线程知道 runnable 已完成。 CountDownLatch 还充当同步器:无论第二个线程写入内存,现在都可以由第一个线程读取。

除了使用中断,您还可以使用 volatile 布尔值。任何读取 volatile 变量的线程都保证看到任何其他线程设置的最后一个值。

A CountDownLatch 也可以给一个超时,这对可以挂起的测试很有用:如果你必须等待很长时间,你可以中止整个测试(例如关闭执行程序,中断线程)并抛出一个AssertionError。在下面的代码中,我重新使用超时来等待收集一定数量的数据,而不是 'sleeping'.

作为优化,使用执行器 (ThreadPool) 而不是创建和启动线程。后者相对昂贵,使用 Executor 确实可以有所作为。

在更新的代码下方,我使它可以作为应用程序运行(main 方法)。 (编辑 28/02/17:在 while 循环中检查 maxCollect > 0)

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class Recorder {

    private final ExecutorService executor;
    private Thread recordingThread;
    private volatile boolean stopRecording;
    private CountDownLatch finishedRecording;
    private Collection<Object> eyeData;
    private int maxCollect;
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean stopped = new AtomicBoolean();

    public Recorder() {
        this(null);
    }

    public Recorder(ExecutorService executor) {
        this.executor = executor;
    }

    public Recorder maxCollect(int max) { maxCollect = max; return this; }

    private class RecordingRunnable implements Runnable {

        @Override public void run() {

            try {
                int collected = 0;
                while (!stopRecording) {
                    eyeData.add(EyeTracker.getData());
                    if (maxCollect > 0 && ++collected >= maxCollect) {
                        stopRecording = true;
                    }
                }
            } finally {
                finishedRecording.countDown();
            }
        }
    }

    public Recorder start() {

        if (!started.compareAndSet(false, true)) {
            throw new IllegalStateException("already started");
        }
        stopRecording = false;
        finishedRecording = new CountDownLatch(1);
        eyeData = new ArrayList<Object>();
        // the RecordingRunnable created below will see the values assigned above ('happens before relationship')
        if (executor == null) {
            recordingThread = new Thread(new RecordingRunnable());
            recordingThread.start();
        } else {
            executor.execute(new RecordingRunnable());
        }
        return this;
    }

    public Collection<Object> getData(long timeout, TimeUnit tunit) {

        if (started.get() == false) {
            throw new IllegalStateException("start first");
        }
        if (!stopped.compareAndSet(false, true)) {
            throw new IllegalStateException("data already fetched");
        }
        if (maxCollect <= 0) {
            stopRecording = true;
        }
        boolean recordingStopped = false;
        try {
            // this establishes a 'happens before relationship'
            // all updates to eyeData are now visible in this thread.
            recordingStopped = finishedRecording.await(timeout, tunit);
        } catch(InterruptedException e) { 
            throw new RuntimeException("interrupted", e);
        } finally {
            stopRecording = true;
        }
        // if recording did not stop, do not return the eyeData (could stil be modified by recording-runnable).
        if (!recordingStopped) {
            throw new RuntimeException("recording");
        }
        // only when everything is OK this recorder instance can be re-used
        started.set(false);
        stopped.set(false);
        return eyeData;
    }

    public static class EyeTracker {

        public static Object getData() {
            try { Thread.sleep(1); } catch (Exception ignored) {}
            return new Object();
        }
    }

    public static void main(String[] args) {

        System.out.println("Starting.");
        ExecutorService exe = Executors.newSingleThreadExecutor();
        try { 
            Recorder r = new Recorder(exe).maxCollect(50).start();
            int dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
            System.out.println("Collected "  + dsize);
            r.maxCollect(100).start();
            dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
            System.out.println("Collected "  + dsize);
            r.maxCollect(0).start();
            Thread.sleep(100);
            dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
            System.out.println("Collected "  + dsize);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exe.shutdownNow();
            System.out.println("Done.");
        }
    }
}

编码愉快:)