在第一个线程锁定到第一个任务时的多个锁中,如何使第二个线程不闲置而是锁定到下一个任务?
In multiple locks when first thread has locked on to first task, how to make second thread to not sit idle and instead lock on to next task?
处理器class-
public class Processor extends Thread {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
private void doJob1() {
synchronized (lock1) {
System.out.println(Thread.currentThread().getName() + " doing job1");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " completed job1");
}
}
private void doJob2() {
synchronized (lock2) {
System.out.println(Thread.currentThread().getName() + " doing job2");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " completed job2");
}
}
public void run() {
doJob1();
doJob2();
}
}
主要方法-
final Processor processor1 = new Processor();
final Processor processor2 = new Processor();
processor1.start();
processor2.start();
这里首先 运行 Thread-0 或 Thread-1 锁定了 job1(),另一个闲置了 5 秒。
第一个线程释放job1()的锁并锁定job2()后,第二个线程获取job1()的锁。
我希望第二个线程不应该闲置,因为 job1() 被第一个线程锁定,它应该先锁定 job2(),然后再锁定 job1()。
如何操作?
注意:这是基本蓝图。实际上,即使有 100 个任务和 5 个线程,我也希望我的代码能够正常工作。
我想,您正在寻找一种方法来 “如果可能的话获取一个锁,如果它不是免费的则做其他事情”。
您可以使用 ReentrantLock#tryLock
。
方法:
Acquires the lock only if it is not held by another thread at the time of invocation.
它 returns:
true
if the lock was free and was acquired by the current thread, or the lock was already held by the current thread; and false
otherwise
这是问题代码的修改版本:
- 如果第一个任务的锁是空闲的,线程将运行第一个任务
- 否则它会运行它在第二个
之后
Processor.java
:
public class Processor extends Thread {
private static final Lock lock1 = new ReentrantLock();
private static final Lock lock2 = new ReentrantLock();
@SneakyThrows
private void doJob1() {
System.out.println(Thread.currentThread().getName() + " doing job1");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " completed job1");
}
@SneakyThrows
private void doJob2() {
System.out.println(Thread.currentThread().getName() + " doing job2");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " completed job2");
}
public void run() {
boolean executedFirst = false;
if (lock1.tryLock()) {
try {
doJob1();
executedFirst = true;
} finally {
lock1.unlock();
}
}
try {
lock2.lock();
doJob2();
} finally {
lock2.unlock();
}
if (!executedFirst) {
try {
lock1.lock();
doJob1();
} finally {
lock1.unlock();
}
}
}
public static void main(String[] args) {
new Processor().start();
new Processor().start();
}
}
示例输出:
Thread-1 doing job2
Thread-0 doing job1
Thread-0 completed job1
Thread-1 completed job2
Thread-1 doing job1
Thread-0 doing job2
Thread-1 completed job1
Thread-0 completed job2
请注意,lock
/tryLock
和 unlock
调用包含在 try
/finally
中
进入篮筐和球:
Color.java
:
public enum Color {
RED,
GREEN,
BLUE;
public static Color fromOrdinal(int i) {
for (Color value : values()) {
if (value.ordinal() == i) {
return value;
}
}
throw new IllegalStateException("Unknown ordinal = " + i);
}
}
Basket.java
:
@Data(staticConstructor = "of")
public class Basket {
private final Color color;
// balls that this basket has
private final List<Ball> balls = new ArrayList<>();
private final Lock lock = new ReentrantLock();
}
Ball.java
@Value(staticConstructor = "of")
public class Ball {
Color color;
}
Boy.java
- 每个男孩选一个球(
queue.poll()
)
- 跑进篮筐(
baskets.get(color)
颜色相同)
- 根据篮筐被占用时的行为,他:
- 扔掉球并重试(代码中的选项
a
)
- 等到篮子被释放(选项
b
)
- 请注意,使用选项
a
时,一些 boy
可能会在另一个人扔掉 ball
并且还没有人捡起它时终止(无论如何,有人会把它捡起来放进篮子里)
@RequiredArgsConstructor
public class Boy implements Runnable {
private final Map<Color, Basket> baskets;
private final Queue<Ball> balls;
@Override
public void run() {
Ball ball;
while ((ball = balls.poll()) != null) {
Color color = ball.getColor();
Basket basket = baskets.get(color);
// a
if (basket.getLock().tryLock()) {
try {
basket.getBalls().add(ball);
} finally {
basket.getLock().unlock();
}
} else {
balls.offer(ball);
}
// b
/*
try {
basket.getLock().lock();
basket.getBalls().add(ball);
} finally {
basket.getLock().unlock();
}
*/
}
}
}
最后 main
:
Queue<Ball> balls = new LinkedBlockingQueue<>();
ThreadLocalRandom.current().ints(0, 3)
.mapToObj(Color::fromOrdinal)
.map(Ball::of)
.limit(1000)
.forEach(balls::add);
Map<Color, Basket> baskets = Map.of(
Color.RED, Basket.of(Color.RED),
Color.GREEN, Basket.of(Color.GREEN),
Color.BLUE, Basket.of(Color.BLUE)
);
List<Thread> threads = IntStream.range(0, 100)
.mapToObj(ignore -> new Boy(baskets, balls))
.map(Thread::new)
.collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread thread : threads) {
thread.join();
}
baskets.forEach((color, basket) -> System.out.println("There are "
+ basket.getBalls().size() + " ball(-s) in " + color + " basket"));
输出示例:
There are 331 ball(-s) in GREEN basket
There are 330 ball(-s) in BLUE basket
There are 339 ball(-s) in RED basket
这是一个稍微复杂一些的示例,其中包含作业的对象和指示作业是否已执行的条件变量,以及包装器如何使 ReentrantLock
适应 try-with- 的示例资源声明。
/**
* A Job represents a unit of work that needs to be performed once and
* depends upon a lock which it must hold while the work is performed.
*/
public class Job {
private final Runnable job;
private final ReentrantLock lock;
private boolean hasRun;
public Job(Runnable job, ReentrantLock lock) {
this.job = Objects.requireNonNull(job);
this.lock = Objects.requireNonNull(lock);
this.hasRun = false;
}
/**
* @returns true if the job has already been run
*/
public boolean hasRun() {
return hasRun;
}
// this is just to make the test in Processor more readable
public boolean hasNotRun() {
return !hasRun;
}
/**
* Tries to perform the job, returning immediately if the job has
* already been performed or the lock cannot be obtained.
*
* @returns true if the job was performed on this invocation
*/
public boolean tryPerform() {
if (hasRun) {
return false;
}
try (TryLocker locker = new TryLocker(lock)) {
if (locker.isLocked()) {
job.run();
hasRun = true;
}
}
return hasRun;
}
}
/**
* A Locker is an AutoCloseable wrapper around a ReentrantLock.
*/
public class Locker implements AutoCloseable {
private final ReentrantLock lock;
public Locker(final ReentrantLock lock) {
this.lock = lock;
lock.lock();
}
@Override
public void close() {
lock.unlock();
}
}
/**
* A TryLocker is an AutoCloseable wrapper around a ReentrantLock that calls
* its tryLock() method and provides a way to test whether than succeeded.
*/
public class TryLocker implements AutoCloseable {
private final ReentrantLock lock;
public TryLocker(final ReentrantLock lock) {
this.lock = lock.tryLock() ? lock : null;
}
public boolean isLocked() {
return lock != null;
}
@Override
public void close() {
if (isLocked()) {
lock.unlock();
}
}
}
/**
* A modified version of the Processor class from the question.
*/
public class Processor extends Thread {
private static final ReentrantLock lock1 = new ReentrantLock();
private static final ReentrantLock lock2 = new ReentrantLock();
private void snooze(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void doJob1() {
System.out.println(Thread.currentThread().getName() + " doing job1");
snooze(5000);
System.out.println(Thread.currentThread().getName() + " completed job1");
}
private void doJob2() {
System.out.println(Thread.currentThread().getName() + " doing job2");
snooze(5000);
System.out.println(Thread.currentThread().getName() + " completed job2");
}
public void run() {
Job job1 = new Job(() -> doJob1(), lock1);
Job job2 = new Job(() -> doJob2(), lock2);
List<Job> jobs = List.of(job1, job2);
while (jobs.stream().anyMatch(Job::hasNotRun)) {
jobs.forEach(Job::tryPerform);
}
}
public static void main(String[] args) {
final Processor processor1 = new Processor();
final Processor processor2 = new Processor();
processor1.start();
processor2.start();
}
}
一些注意事项:
Processor
中的 run()
方法现在概括为 n 个作业列表。虽然尚未执行任何作业,但它将尝试执行它们,并在所有作业完成后完成。
-
TryLocker
class 是 AutoCloseable
,因此 Job
中的锁定和解锁可以通过在 try 中创建它的实例来完成-with-resources 语句。
Locker
class 在这里未使用,但演示了如何为阻塞 lock()
调用而不是 tryLock()
调用完成相同的事情。
TryLocker
也可以花费一段时间并调用 tryLock
的重载,如果需要的话,它会在放弃之前等待一段时间;该修改留作 reader. 的练习
-
Job
的 hasNotRun()
方法只是为了让 anyMatch(Job::hasNotRun)
在 Processor
的 run()
方法中更具可读性;它可能没有发挥作用,可以放弃。
- 储物柜 classes 不使用
Objects.requireNonNull
检查传入的锁不为空;他们通过调用它的方法立即使用它,所以如果它为空,他们仍然会抛出一个 NPE,但是明确地 requireNonNull 可能会让他们更清楚。
- 储物柜 class 在调用
unlock()
使它们幂等之前,不会费心去检查它们是否已经解锁了 ReentrantLock
;在这种情况下,他们会抛出 IllegalMonitorStateException
。在过去,我写了一个带有标志变量的变体来避免这种情况,但由于打算在 try-with-resources 语句中使用它们,它只会调用 close()
方法一次,我觉得有人手动调用close方法还是让他们炸掉比较好
处理器class-
public class Processor extends Thread {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
private void doJob1() {
synchronized (lock1) {
System.out.println(Thread.currentThread().getName() + " doing job1");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " completed job1");
}
}
private void doJob2() {
synchronized (lock2) {
System.out.println(Thread.currentThread().getName() + " doing job2");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " completed job2");
}
}
public void run() {
doJob1();
doJob2();
}
}
主要方法-
final Processor processor1 = new Processor();
final Processor processor2 = new Processor();
processor1.start();
processor2.start();
这里首先 运行 Thread-0 或 Thread-1 锁定了 job1(),另一个闲置了 5 秒。
第一个线程释放job1()的锁并锁定job2()后,第二个线程获取job1()的锁。
我希望第二个线程不应该闲置,因为 job1() 被第一个线程锁定,它应该先锁定 job2(),然后再锁定 job1()。
如何操作?
注意:这是基本蓝图。实际上,即使有 100 个任务和 5 个线程,我也希望我的代码能够正常工作。
我想,您正在寻找一种方法来 “如果可能的话获取一个锁,如果它不是免费的则做其他事情”。
您可以使用 ReentrantLock#tryLock
。
方法:
Acquires the lock only if it is not held by another thread at the time of invocation.
它 returns:
true
if the lock was free and was acquired by the current thread, or the lock was already held by the current thread; andfalse
otherwise
这是问题代码的修改版本:
- 如果第一个任务的锁是空闲的,线程将运行第一个任务
- 否则它会运行它在第二个 之后
Processor.java
:
public class Processor extends Thread {
private static final Lock lock1 = new ReentrantLock();
private static final Lock lock2 = new ReentrantLock();
@SneakyThrows
private void doJob1() {
System.out.println(Thread.currentThread().getName() + " doing job1");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " completed job1");
}
@SneakyThrows
private void doJob2() {
System.out.println(Thread.currentThread().getName() + " doing job2");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " completed job2");
}
public void run() {
boolean executedFirst = false;
if (lock1.tryLock()) {
try {
doJob1();
executedFirst = true;
} finally {
lock1.unlock();
}
}
try {
lock2.lock();
doJob2();
} finally {
lock2.unlock();
}
if (!executedFirst) {
try {
lock1.lock();
doJob1();
} finally {
lock1.unlock();
}
}
}
public static void main(String[] args) {
new Processor().start();
new Processor().start();
}
}
示例输出:
Thread-1 doing job2
Thread-0 doing job1
Thread-0 completed job1
Thread-1 completed job2
Thread-1 doing job1
Thread-0 doing job2
Thread-1 completed job1
Thread-0 completed job2
请注意,lock
/tryLock
和 unlock
调用包含在 try
/finally
进入篮筐和球:
Color.java
:
public enum Color {
RED,
GREEN,
BLUE;
public static Color fromOrdinal(int i) {
for (Color value : values()) {
if (value.ordinal() == i) {
return value;
}
}
throw new IllegalStateException("Unknown ordinal = " + i);
}
}
Basket.java
:
@Data(staticConstructor = "of")
public class Basket {
private final Color color;
// balls that this basket has
private final List<Ball> balls = new ArrayList<>();
private final Lock lock = new ReentrantLock();
}
Ball.java
@Value(staticConstructor = "of")
public class Ball {
Color color;
}
Boy.java
- 每个男孩选一个球(
queue.poll()
) - 跑进篮筐(
baskets.get(color)
颜色相同) - 根据篮筐被占用时的行为,他:
- 扔掉球并重试(代码中的选项
a
) - 等到篮子被释放(选项
b
)
- 扔掉球并重试(代码中的选项
- 请注意,使用选项
a
时,一些boy
可能会在另一个人扔掉ball
并且还没有人捡起它时终止(无论如何,有人会把它捡起来放进篮子里)
@RequiredArgsConstructor
public class Boy implements Runnable {
private final Map<Color, Basket> baskets;
private final Queue<Ball> balls;
@Override
public void run() {
Ball ball;
while ((ball = balls.poll()) != null) {
Color color = ball.getColor();
Basket basket = baskets.get(color);
// a
if (basket.getLock().tryLock()) {
try {
basket.getBalls().add(ball);
} finally {
basket.getLock().unlock();
}
} else {
balls.offer(ball);
}
// b
/*
try {
basket.getLock().lock();
basket.getBalls().add(ball);
} finally {
basket.getLock().unlock();
}
*/
}
}
}
最后 main
:
Queue<Ball> balls = new LinkedBlockingQueue<>();
ThreadLocalRandom.current().ints(0, 3)
.mapToObj(Color::fromOrdinal)
.map(Ball::of)
.limit(1000)
.forEach(balls::add);
Map<Color, Basket> baskets = Map.of(
Color.RED, Basket.of(Color.RED),
Color.GREEN, Basket.of(Color.GREEN),
Color.BLUE, Basket.of(Color.BLUE)
);
List<Thread> threads = IntStream.range(0, 100)
.mapToObj(ignore -> new Boy(baskets, balls))
.map(Thread::new)
.collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread thread : threads) {
thread.join();
}
baskets.forEach((color, basket) -> System.out.println("There are "
+ basket.getBalls().size() + " ball(-s) in " + color + " basket"));
输出示例:
There are 331 ball(-s) in GREEN basket
There are 330 ball(-s) in BLUE basket
There are 339 ball(-s) in RED basket
这是一个稍微复杂一些的示例,其中包含作业的对象和指示作业是否已执行的条件变量,以及包装器如何使 ReentrantLock
适应 try-with- 的示例资源声明。
/**
* A Job represents a unit of work that needs to be performed once and
* depends upon a lock which it must hold while the work is performed.
*/
public class Job {
private final Runnable job;
private final ReentrantLock lock;
private boolean hasRun;
public Job(Runnable job, ReentrantLock lock) {
this.job = Objects.requireNonNull(job);
this.lock = Objects.requireNonNull(lock);
this.hasRun = false;
}
/**
* @returns true if the job has already been run
*/
public boolean hasRun() {
return hasRun;
}
// this is just to make the test in Processor more readable
public boolean hasNotRun() {
return !hasRun;
}
/**
* Tries to perform the job, returning immediately if the job has
* already been performed or the lock cannot be obtained.
*
* @returns true if the job was performed on this invocation
*/
public boolean tryPerform() {
if (hasRun) {
return false;
}
try (TryLocker locker = new TryLocker(lock)) {
if (locker.isLocked()) {
job.run();
hasRun = true;
}
}
return hasRun;
}
}
/**
* A Locker is an AutoCloseable wrapper around a ReentrantLock.
*/
public class Locker implements AutoCloseable {
private final ReentrantLock lock;
public Locker(final ReentrantLock lock) {
this.lock = lock;
lock.lock();
}
@Override
public void close() {
lock.unlock();
}
}
/**
* A TryLocker is an AutoCloseable wrapper around a ReentrantLock that calls
* its tryLock() method and provides a way to test whether than succeeded.
*/
public class TryLocker implements AutoCloseable {
private final ReentrantLock lock;
public TryLocker(final ReentrantLock lock) {
this.lock = lock.tryLock() ? lock : null;
}
public boolean isLocked() {
return lock != null;
}
@Override
public void close() {
if (isLocked()) {
lock.unlock();
}
}
}
/**
* A modified version of the Processor class from the question.
*/
public class Processor extends Thread {
private static final ReentrantLock lock1 = new ReentrantLock();
private static final ReentrantLock lock2 = new ReentrantLock();
private void snooze(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void doJob1() {
System.out.println(Thread.currentThread().getName() + " doing job1");
snooze(5000);
System.out.println(Thread.currentThread().getName() + " completed job1");
}
private void doJob2() {
System.out.println(Thread.currentThread().getName() + " doing job2");
snooze(5000);
System.out.println(Thread.currentThread().getName() + " completed job2");
}
public void run() {
Job job1 = new Job(() -> doJob1(), lock1);
Job job2 = new Job(() -> doJob2(), lock2);
List<Job> jobs = List.of(job1, job2);
while (jobs.stream().anyMatch(Job::hasNotRun)) {
jobs.forEach(Job::tryPerform);
}
}
public static void main(String[] args) {
final Processor processor1 = new Processor();
final Processor processor2 = new Processor();
processor1.start();
processor2.start();
}
}
一些注意事项:
Processor
中的run()
方法现在概括为 n 个作业列表。虽然尚未执行任何作业,但它将尝试执行它们,并在所有作业完成后完成。-
TryLocker
class 是AutoCloseable
,因此Job
中的锁定和解锁可以通过在 try 中创建它的实例来完成-with-resources 语句。 Locker
class 在这里未使用,但演示了如何为阻塞lock()
调用而不是tryLock()
调用完成相同的事情。TryLocker
也可以花费一段时间并调用tryLock
的重载,如果需要的话,它会在放弃之前等待一段时间;该修改留作 reader. 的练习
-
Job
的hasNotRun()
方法只是为了让anyMatch(Job::hasNotRun)
在Processor
的run()
方法中更具可读性;它可能没有发挥作用,可以放弃。 - 储物柜 classes 不使用
Objects.requireNonNull
检查传入的锁不为空;他们通过调用它的方法立即使用它,所以如果它为空,他们仍然会抛出一个 NPE,但是明确地 requireNonNull 可能会让他们更清楚。 - 储物柜 class 在调用
unlock()
使它们幂等之前,不会费心去检查它们是否已经解锁了ReentrantLock
;在这种情况下,他们会抛出IllegalMonitorStateException
。在过去,我写了一个带有标志变量的变体来避免这种情况,但由于打算在 try-with-resources 语句中使用它们,它只会调用close()
方法一次,我觉得有人手动调用close方法还是让他们炸掉比较好