奇怪的 Java 多线程行为
Weird Java multithreading behaviour
我在一个由 REST API 组成的应用程序中工作,该应用程序公开了多个端点,其中之一是一种加载系统数据的初始化操作,可以在应用程序生命周期中多次调用, 删除现有数据并加载新数据。其余端点对数据执行某些操作,并将一些额外数据引入系统,这些数据将在调用初始化端点时被删除。考虑到应用程序应该在并发场景下始终如一地工作,我决定在 ReentrantReadWriteLock
的基础上实现 Synchronizer
class,其中只有初始化操作会锁定写入锁,其余操作将锁定在读锁上。但是,在测试我的实现时我遇到了一个奇怪的行为,我想有些东西我不太了解,不确定是关于我的实现还是测试本身。
public class LockBasedSynchronizer implements Synchronizer {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Override
public void startNonBlocking() {
lock.readLock().lock();
}
@Override
public void endNonBlocking() {
lock.readLock().unlock();
}
@Override
public void startBlocking() {
lock.writeLock().lock();
}
@Override
public void endBlocking() {
lock.writeLock().unlock();
}
}
class LockBasedSynchronizerTest {
private LockBasedSynchronizer synchronizer;
@BeforeEach
void setUp() {
synchronizer = new LockBasedSynchronizer();
}
@Test
void itDoesntBlockNonBlockingOperations() {
List<String> outputScrapper = new ArrayList<>();
List<Worker> threads = asList(
new NonBlockingWorker(synchronizer, outputScrapper, 300, "first"),
new NonBlockingWorker(synchronizer, outputScrapper, 100, "second"),
new NonBlockingWorker(synchronizer, outputScrapper, 10, "third")
);
threads.parallelStream().forEach(Worker::run); // this way it works
// threads.forEach(Worker::run); // this way it fails
assertThat(outputScrapper).containsExactly("third", "second", "first");
}
@AllArgsConstructor
abstract class Worker extends Thread {
protected Synchronizer synchronizer;
private List<String> outputScraper;
private int delayInMilliseconds;
private String id;
protected abstract void lock();
protected abstract void unlock();
@Override
public void run() {
try {
System.out.println(format("Starting [%s]", id));
sleep(delayInMilliseconds);
lock();
outputScraper.add(id);
unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class NonBlockingWorker extends Worker {
public NonBlockingWorker(Synchronizer synchronizer, List<String> outputScraper, int delayInMilliseconds, String id) {
super(synchronizer, outputScraper, delayInMilliseconds, id);
}
@Override
protected void lock() {
synchronizer.startNonBlocking();
}
@Override
protected void unlock() {
synchronizer.endNonBlocking();
}
}
如您所见,当我通过并行流并行生成工作线程时,它会按预期工作,但如果我 运行 按顺序排列它们,它们就不会按预期工作,而是在他们 运行 的顺序完全相同。看起来它们是使用一个线程执行的。有人可以解释为什么吗?
此外,我想到的另一个关于此实现的疑问是,如果阻塞操作正在等待写锁并且非阻塞操作不停地到达,会发生什么情况。它会无限期地等待直到非阻塞操作停止到达吗?
除此之外,对于暴露的问题,如果您能想到更好的解决方案,欢迎提出建议。
首先,threads.forEach(Worker::run)
的主要问题是它实际上并没有同时 运行 任何东西,您只是遍历 Worker
列表并调用它们的 run()
方法,一个接一个。
现在再补充几点。
一般来说,不要扩展 Thread
。而是创建一些 Runnable
或 Callable
或一些其他接口 成为 线程中的 运行,通常通过 Executor
。你不是想重新定义 Thread
你只是想同时 运行 一些事情。
此外,使用您现在拥有的代码,您无论如何都不会使用扩展 Thread
,因为您只是为 Worker
应用 run()
方法在你的 Stream
中,它实际上可以是任何 class 在你的流或列表中的方法。
我认为最好的办法是让 Worker
实现 Runnable
,而不是扩展 Thread
,然后将那些 Workers
提供给 Executor
.或者简单地将 run()
应用于 parallelStream()
的元素也可以,我猜,只要知道你在做什么。
调用 Thread::run 是阻塞的,它不会创建新线程。您需要调用方法 Thread::start 并等待 1 秒让工作人员完成:
threads.forEach(Worker::start);
Thread.sleep(1000);
我在一个由 REST API 组成的应用程序中工作,该应用程序公开了多个端点,其中之一是一种加载系统数据的初始化操作,可以在应用程序生命周期中多次调用, 删除现有数据并加载新数据。其余端点对数据执行某些操作,并将一些额外数据引入系统,这些数据将在调用初始化端点时被删除。考虑到应用程序应该在并发场景下始终如一地工作,我决定在 ReentrantReadWriteLock
的基础上实现 Synchronizer
class,其中只有初始化操作会锁定写入锁,其余操作将锁定在读锁上。但是,在测试我的实现时我遇到了一个奇怪的行为,我想有些东西我不太了解,不确定是关于我的实现还是测试本身。
public class LockBasedSynchronizer implements Synchronizer {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@Override
public void startNonBlocking() {
lock.readLock().lock();
}
@Override
public void endNonBlocking() {
lock.readLock().unlock();
}
@Override
public void startBlocking() {
lock.writeLock().lock();
}
@Override
public void endBlocking() {
lock.writeLock().unlock();
}
}
class LockBasedSynchronizerTest {
private LockBasedSynchronizer synchronizer;
@BeforeEach
void setUp() {
synchronizer = new LockBasedSynchronizer();
}
@Test
void itDoesntBlockNonBlockingOperations() {
List<String> outputScrapper = new ArrayList<>();
List<Worker> threads = asList(
new NonBlockingWorker(synchronizer, outputScrapper, 300, "first"),
new NonBlockingWorker(synchronizer, outputScrapper, 100, "second"),
new NonBlockingWorker(synchronizer, outputScrapper, 10, "third")
);
threads.parallelStream().forEach(Worker::run); // this way it works
// threads.forEach(Worker::run); // this way it fails
assertThat(outputScrapper).containsExactly("third", "second", "first");
}
@AllArgsConstructor
abstract class Worker extends Thread {
protected Synchronizer synchronizer;
private List<String> outputScraper;
private int delayInMilliseconds;
private String id;
protected abstract void lock();
protected abstract void unlock();
@Override
public void run() {
try {
System.out.println(format("Starting [%s]", id));
sleep(delayInMilliseconds);
lock();
outputScraper.add(id);
unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class NonBlockingWorker extends Worker {
public NonBlockingWorker(Synchronizer synchronizer, List<String> outputScraper, int delayInMilliseconds, String id) {
super(synchronizer, outputScraper, delayInMilliseconds, id);
}
@Override
protected void lock() {
synchronizer.startNonBlocking();
}
@Override
protected void unlock() {
synchronizer.endNonBlocking();
}
}
如您所见,当我通过并行流并行生成工作线程时,它会按预期工作,但如果我 运行 按顺序排列它们,它们就不会按预期工作,而是在他们 运行 的顺序完全相同。看起来它们是使用一个线程执行的。有人可以解释为什么吗?
此外,我想到的另一个关于此实现的疑问是,如果阻塞操作正在等待写锁并且非阻塞操作不停地到达,会发生什么情况。它会无限期地等待直到非阻塞操作停止到达吗?
除此之外,对于暴露的问题,如果您能想到更好的解决方案,欢迎提出建议。
首先,threads.forEach(Worker::run)
的主要问题是它实际上并没有同时 运行 任何东西,您只是遍历 Worker
列表并调用它们的 run()
方法,一个接一个。
现在再补充几点。
一般来说,不要扩展 Thread
。而是创建一些 Runnable
或 Callable
或一些其他接口 成为 线程中的 运行,通常通过 Executor
。你不是想重新定义 Thread
你只是想同时 运行 一些事情。
此外,使用您现在拥有的代码,您无论如何都不会使用扩展 Thread
,因为您只是为 Worker
应用 run()
方法在你的 Stream
中,它实际上可以是任何 class 在你的流或列表中的方法。
我认为最好的办法是让 Worker
实现 Runnable
,而不是扩展 Thread
,然后将那些 Workers
提供给 Executor
.或者简单地将 run()
应用于 parallelStream()
的元素也可以,我猜,只要知道你在做什么。
调用 Thread::run 是阻塞的,它不会创建新线程。您需要调用方法 Thread::start 并等待 1 秒让工作人员完成:
threads.forEach(Worker::start);
Thread.sleep(1000);