必须手动中断的单元测试异步计算
Unit testing asynchronous computation that has to be interrupted manually
我有一个 class 可以异步记录眼动数据。有方法start
和stop
记录过程。数据收集在一个集合中,只有在记录线程完成其工作后才能访问该集合。它基本上封装了所有线程和同步,因此我的库的用户不必这样做。
大大缩短的代码(省略了泛型和错误处理):
public class Recorder {
private Collection accumulatorCollection;
private Thread recordingThread;
private class RecordingRunnable implements Runnable {
...
public void run() {
while(!Thread.currentThread().isInterrupted()) {
// fetch data and collect it in the accumulator
synchronized(acc) { acc.add(Eyetracker.getData()) }
}
}
}
public void start() {
accumulatorCollection = new Collection();
recordingThread = new Thread(new RecordingRunnable(accumulatorCollection));
recordingThread.start();
}
public void stop() {
recordingThread.interrupt();
}
public void getData() {
try {
recordingThread.join(2000);
if(recordingThread.isAlive()) { throw Exception(); }
}
catch(InterruptedException e) { ... }
synchronized(accumulatorCollection) { return accumulatorCollection; }
}
}
用法很简单:
recorder.start();
...
recorder.stop();
Collection data = recorder.getData();
我的问题是如何测试它。目前我是这样做的:
recorder.start();
Thread.sleep(50);
recorder.stop();
Collection data = recorder.getData();
assert(stuff);
这有效,但它是不确定的,并且会大大降低测试套件的速度(我将这些测试标记为集成测试,因此它们必须单独 运行 才能避免这个问题)。
有没有更好的方法?
有更好的方法使用 CountDownLatch。
测试的不确定性部分源于您未考虑的两个时间变量:
- 创建和启动线程需要时间,并且线程可能在
Thread.start()
returns 时尚未开始执行可运行对象(可运行对象将被执行,但可能会稍晚一些)。
- stop/interrupt 会中断 Runnable 中的 while 循环,但不会立即中断,可能会稍晚一些。
这是 CountDownLatch
的用武之地:它为您提供有关另一个线程正在执行的位置的准确信息。例如。让第一个线程在闩锁上等待,而第二个 "counts down" 闩锁作为 runnable 中的最后一条语句,现在第一个线程知道 runnable 已完成。 CountDownLatch
还充当同步器:无论第二个线程写入内存,现在都可以由第一个线程读取。
除了使用中断,您还可以使用 volatile
布尔值。任何读取 volatile
变量的线程都保证看到任何其他线程设置的最后一个值。
A CountDownLatch
也可以给一个超时,这对可以挂起的测试很有用:如果你必须等待很长时间,你可以中止整个测试(例如关闭执行程序,中断线程)并抛出一个AssertionError
。在下面的代码中,我重新使用超时来等待收集一定数量的数据,而不是 'sleeping'.
作为优化,使用执行器 (ThreadPool) 而不是创建和启动线程。后者相对昂贵,使用 Executor 确实可以有所作为。
在更新的代码下方,我使它可以作为应用程序运行(main
方法)。 (编辑 28/02/17:在 while 循环中检查 maxCollect > 0)
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class Recorder {
private final ExecutorService executor;
private Thread recordingThread;
private volatile boolean stopRecording;
private CountDownLatch finishedRecording;
private Collection<Object> eyeData;
private int maxCollect;
private final AtomicBoolean started = new AtomicBoolean();
private final AtomicBoolean stopped = new AtomicBoolean();
public Recorder() {
this(null);
}
public Recorder(ExecutorService executor) {
this.executor = executor;
}
public Recorder maxCollect(int max) { maxCollect = max; return this; }
private class RecordingRunnable implements Runnable {
@Override public void run() {
try {
int collected = 0;
while (!stopRecording) {
eyeData.add(EyeTracker.getData());
if (maxCollect > 0 && ++collected >= maxCollect) {
stopRecording = true;
}
}
} finally {
finishedRecording.countDown();
}
}
}
public Recorder start() {
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("already started");
}
stopRecording = false;
finishedRecording = new CountDownLatch(1);
eyeData = new ArrayList<Object>();
// the RecordingRunnable created below will see the values assigned above ('happens before relationship')
if (executor == null) {
recordingThread = new Thread(new RecordingRunnable());
recordingThread.start();
} else {
executor.execute(new RecordingRunnable());
}
return this;
}
public Collection<Object> getData(long timeout, TimeUnit tunit) {
if (started.get() == false) {
throw new IllegalStateException("start first");
}
if (!stopped.compareAndSet(false, true)) {
throw new IllegalStateException("data already fetched");
}
if (maxCollect <= 0) {
stopRecording = true;
}
boolean recordingStopped = false;
try {
// this establishes a 'happens before relationship'
// all updates to eyeData are now visible in this thread.
recordingStopped = finishedRecording.await(timeout, tunit);
} catch(InterruptedException e) {
throw new RuntimeException("interrupted", e);
} finally {
stopRecording = true;
}
// if recording did not stop, do not return the eyeData (could stil be modified by recording-runnable).
if (!recordingStopped) {
throw new RuntimeException("recording");
}
// only when everything is OK this recorder instance can be re-used
started.set(false);
stopped.set(false);
return eyeData;
}
public static class EyeTracker {
public static Object getData() {
try { Thread.sleep(1); } catch (Exception ignored) {}
return new Object();
}
}
public static void main(String[] args) {
System.out.println("Starting.");
ExecutorService exe = Executors.newSingleThreadExecutor();
try {
Recorder r = new Recorder(exe).maxCollect(50).start();
int dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
System.out.println("Collected " + dsize);
r.maxCollect(100).start();
dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
System.out.println("Collected " + dsize);
r.maxCollect(0).start();
Thread.sleep(100);
dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
System.out.println("Collected " + dsize);
} catch (Exception e) {
e.printStackTrace();
} finally {
exe.shutdownNow();
System.out.println("Done.");
}
}
}
编码愉快:)
我有一个 class 可以异步记录眼动数据。有方法start
和stop
记录过程。数据收集在一个集合中,只有在记录线程完成其工作后才能访问该集合。它基本上封装了所有线程和同步,因此我的库的用户不必这样做。
大大缩短的代码(省略了泛型和错误处理):
public class Recorder {
private Collection accumulatorCollection;
private Thread recordingThread;
private class RecordingRunnable implements Runnable {
...
public void run() {
while(!Thread.currentThread().isInterrupted()) {
// fetch data and collect it in the accumulator
synchronized(acc) { acc.add(Eyetracker.getData()) }
}
}
}
public void start() {
accumulatorCollection = new Collection();
recordingThread = new Thread(new RecordingRunnable(accumulatorCollection));
recordingThread.start();
}
public void stop() {
recordingThread.interrupt();
}
public void getData() {
try {
recordingThread.join(2000);
if(recordingThread.isAlive()) { throw Exception(); }
}
catch(InterruptedException e) { ... }
synchronized(accumulatorCollection) { return accumulatorCollection; }
}
}
用法很简单:
recorder.start();
...
recorder.stop();
Collection data = recorder.getData();
我的问题是如何测试它。目前我是这样做的:
recorder.start();
Thread.sleep(50);
recorder.stop();
Collection data = recorder.getData();
assert(stuff);
这有效,但它是不确定的,并且会大大降低测试套件的速度(我将这些测试标记为集成测试,因此它们必须单独 运行 才能避免这个问题)。
有没有更好的方法?
有更好的方法使用 CountDownLatch。
测试的不确定性部分源于您未考虑的两个时间变量:
- 创建和启动线程需要时间,并且线程可能在
Thread.start()
returns 时尚未开始执行可运行对象(可运行对象将被执行,但可能会稍晚一些)。 - stop/interrupt 会中断 Runnable 中的 while 循环,但不会立即中断,可能会稍晚一些。
这是 CountDownLatch
的用武之地:它为您提供有关另一个线程正在执行的位置的准确信息。例如。让第一个线程在闩锁上等待,而第二个 "counts down" 闩锁作为 runnable 中的最后一条语句,现在第一个线程知道 runnable 已完成。 CountDownLatch
还充当同步器:无论第二个线程写入内存,现在都可以由第一个线程读取。
除了使用中断,您还可以使用 volatile
布尔值。任何读取 volatile
变量的线程都保证看到任何其他线程设置的最后一个值。
A CountDownLatch
也可以给一个超时,这对可以挂起的测试很有用:如果你必须等待很长时间,你可以中止整个测试(例如关闭执行程序,中断线程)并抛出一个AssertionError
。在下面的代码中,我重新使用超时来等待收集一定数量的数据,而不是 'sleeping'.
作为优化,使用执行器 (ThreadPool) 而不是创建和启动线程。后者相对昂贵,使用 Executor 确实可以有所作为。
在更新的代码下方,我使它可以作为应用程序运行(main
方法)。 (编辑 28/02/17:在 while 循环中检查 maxCollect > 0)
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class Recorder {
private final ExecutorService executor;
private Thread recordingThread;
private volatile boolean stopRecording;
private CountDownLatch finishedRecording;
private Collection<Object> eyeData;
private int maxCollect;
private final AtomicBoolean started = new AtomicBoolean();
private final AtomicBoolean stopped = new AtomicBoolean();
public Recorder() {
this(null);
}
public Recorder(ExecutorService executor) {
this.executor = executor;
}
public Recorder maxCollect(int max) { maxCollect = max; return this; }
private class RecordingRunnable implements Runnable {
@Override public void run() {
try {
int collected = 0;
while (!stopRecording) {
eyeData.add(EyeTracker.getData());
if (maxCollect > 0 && ++collected >= maxCollect) {
stopRecording = true;
}
}
} finally {
finishedRecording.countDown();
}
}
}
public Recorder start() {
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("already started");
}
stopRecording = false;
finishedRecording = new CountDownLatch(1);
eyeData = new ArrayList<Object>();
// the RecordingRunnable created below will see the values assigned above ('happens before relationship')
if (executor == null) {
recordingThread = new Thread(new RecordingRunnable());
recordingThread.start();
} else {
executor.execute(new RecordingRunnable());
}
return this;
}
public Collection<Object> getData(long timeout, TimeUnit tunit) {
if (started.get() == false) {
throw new IllegalStateException("start first");
}
if (!stopped.compareAndSet(false, true)) {
throw new IllegalStateException("data already fetched");
}
if (maxCollect <= 0) {
stopRecording = true;
}
boolean recordingStopped = false;
try {
// this establishes a 'happens before relationship'
// all updates to eyeData are now visible in this thread.
recordingStopped = finishedRecording.await(timeout, tunit);
} catch(InterruptedException e) {
throw new RuntimeException("interrupted", e);
} finally {
stopRecording = true;
}
// if recording did not stop, do not return the eyeData (could stil be modified by recording-runnable).
if (!recordingStopped) {
throw new RuntimeException("recording");
}
// only when everything is OK this recorder instance can be re-used
started.set(false);
stopped.set(false);
return eyeData;
}
public static class EyeTracker {
public static Object getData() {
try { Thread.sleep(1); } catch (Exception ignored) {}
return new Object();
}
}
public static void main(String[] args) {
System.out.println("Starting.");
ExecutorService exe = Executors.newSingleThreadExecutor();
try {
Recorder r = new Recorder(exe).maxCollect(50).start();
int dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
System.out.println("Collected " + dsize);
r.maxCollect(100).start();
dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
System.out.println("Collected " + dsize);
r.maxCollect(0).start();
Thread.sleep(100);
dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
System.out.println("Collected " + dsize);
} catch (Exception e) {
e.printStackTrace();
} finally {
exe.shutdownNow();
System.out.println("Done.");
}
}
}
编码愉快:)