线程执行太多次并导致竞争条件,即使我正在使用锁
Thread executes too many times and causes race condition even though I'm using locks
我正在为一个用于模拟仓库的练习开发多线程应用程序(类似于生产者消费者问题)但是我 运行 在增加消费者数量的程序中遇到了一些麻烦线程使程序以意想不到的方式运行。
代码:
我正在创建一个名为 buyer 的生产者线程,其目标是每次从仓库中准确地订购 10 个订单。为此,他们有一个名为仓库的共享对象,买家可以在该对象上下订单,然后将订单存储在共享对象的缓冲区中。在此之后,买家会休眠一段时间,直到它再次尝试或所有包裹都已购买。执行此操作的代码如下所示:
public void run() {
//Run until the thread has bought 10 packages, this ensures the thread
//will eventually stop execution automatically.
while(this.packsBought < 10) {
try {
//Sleep for a random amount of time between 1 and 50
//milliseconds.
Thread.sleep(this.rand.nextInt(49) + 1);
//Catch any interruptExceptions.
} catch (InterruptedException ex) {
//There is no problem if this exception is thrown, the thread
//will just make an order earlier than planned. that being said
//there should be no manner in which this exception is thrown.
}
//Create a new order.
Order order = new Order(this.rand.nextInt(3)+ 1,
this,
this.isPrime);
//Set the time at which the order was placed as now.
order.setOrderTime(System.currentTimeMillis());
//place the newly created order in the warehouse.
this.warehouse.placeOrder(order);
}
//Notify the thread has finished execution.
System.out.println("Thread: " + super.getName() + " has finished.");
}
如您所见,函数placeOrder(Order order);
用于在仓库下订单。此功能负责根据与主要状态相关的一些逻辑将订单放入队列中。该函数如下所示:
public void placeOrder(Order order) {
try{
//halt untill there are enough packs to handle an order.
this.notFullBuffer.acquire();
//Lock to signify the start of the critical section.
this.mutexBuffer.lock();
//Insert the order in the buffer depending on prime status.
if (order.isPrime()) {
//prime order, insert behind all prime orders in buffer.
//Enumerate all non prime orders in the list.
for (int i = inPrime; i < sizeOrderList - 1; i++) {
//Move the non prime order back 1 position in the list.
buffer[i + 1] = buffer[i];
}
// Insert the prime order.
buffer[inPrime++] = order;
} else {
//No prime order, insert behind all orders in buffer.
buffer[inPrime + inNormal++] = order;
}
//Notify the DispatchWorkers that a new order has been placed.
this.notEmptyBuffer.release();
//Catch any InterruptException that might occure.
} catch(InterruptedException e){
//Even though this isn't expected behavior, there is no reason to
//notify the user of this event or to preform any other action as
//the thread will just return to the queue before placing another
//error if it is still required to do so.
} finally {
//Unlock and finalize the critical section.
mutexBuffer.unlock();
}
}
订单由充当消费者线程的工人消费。线程本身包含非常简单的代码循环,直到处理完所有订单。在此循环中,对同一个仓库对象调用不同的函数 handleOrder();
,该对象处理缓冲区中的单个订单。它使用以下代码执行此操作:
public void handleOrder(){
//Create a variable to store the order being handled.
Order toHandle = null;
try{
//wait until there is an order to handle.
this.notEmptyBuffer.acquire();
//Lock to signify the start of the critical section.
this.mutexBuffer.lock();
//obtain the first order to handle as the first element of the buffer
toHandle = buffer[0];
//move all buffer elementst back by 1 position.
for(int i = 1; i < sizeOrderList; i++){
buffer[i - 1] = buffer[i];
}
//set the last element in the buffer to null
buffer[sizeOrderList - 1] = null;
//We have obtained an order from the buffer and now we can handle it.
if(toHandle != null) {
int nPacks = toHandle.getnPacks();
//wait until the appropriate resources are available.
this.hasBoxes.acquire(nPacks);
this.hasTape.acquire(nPacks * 50);
//Now we can handle the order (Simulated by sleeping. Although
//in real live Amazon workers also have about 5ms of time per
//package).
Thread.sleep(5 * nPacks);
//Calculate the total time this order took.
long time = System.currentTimeMillis() -
toHandle.getOrderTime();
//Update the total waiting time for the buyer.
toHandle.getBuyer().setWaitingTime(time +
toHandle.getBuyer().getWaitingTime());
//Check if the order to handle is prime or not.
if(toHandle.isPrime()) {
//Decrement the position of which prime orders are
//inserted into the buffer.
inPrime--;
} else {
//Decrement the position of which normal orders are
//inserted into the buffer.
inNormal--;
}
//Print a message informing the user a new order was completed.
System.out.println("An order has been completed for: "
+ toHandle.getBuyer().getName());
//Notify the buyer he has sucsessfully ordered a new package.
toHandle.getBuyer().setPacksBought(
toHandle.getBuyer().getPacksBought() + 1);
}else {
//Notify the user there was a critical error obtaining the
//error to handle. (There shouldn't exist a case where this
//should happen but you never know.)
System.err.println("Something went wrong obtaining an order.");
}
//Notify the buyers that a new spot has been opened in the buffer.
this.notFullBuffer.release();
//Catch any interrupt exceptions.
} catch(InterruptedException e){
//This is expected behavior as it allows us to force the thread to
//revaluate it's main running loop when notifying it to finish
//execution.
} finally {
//Check if the current thread is locking the buffer lock. This is
//done as in the case of an interrupt we don't want to execute this
//code if the thread interrupted doesn't hold the lock as that
//would result in an exception we don't want.
if (mutexBuffer.isHeldByCurrentThread())
//Unlock the buffer lock.
mutexBuffer.unlock();
}
}
问题:
为了验证程序的功能,我使用语句的输出:
System.out.println("An order has been completed for: "
+ toHandle.getBuyer().getName());
来自 handleOrder();
函数。我将整个输出放在一个文本文件中,删除该 println();
语句未添加的所有行并计算行数以了解已处理了多少订单。我希望这个值等于线程数乘以 10,但通常情况并非如此。 运行 测试 我注意到有时它确实有效并且没有问题,但有时一个或多个买家线程收到的订单多于应有的数量。有 5 个买家线程应该有 50 个输出,但我得到 50 到 60 行(订单位置)。
将线程数量增加到 30 会增加问题,现在我可以预期增加多达 50% 的订单,一些线程最多下 30 个订单。
做一些研究这称为数据争用,是由 2 个线程同时访问相同数据而其中 1 个线程写入数据引起的。这基本上改变了数据,使得另一个线程无法使用它期望使用的相同数据。
我的尝试:
我坚信 ReentrantLocks
旨在处理此类情况,因为如果另一个线程尚未离开,它们应该阻止任何线程进入一段代码。 placeOrder(Order order);
和 handleOrder();
函数都使用了这个机制。因此,我假设我没有正确实施。这是项目的一个版本,它可以从一个名为 Test.java 的文件中编译和执行。谁能看一下上面解释的代码并告诉我我做错了什么?
编辑
我注意到买家可以下 10 个以上的订单,所以我将代码更改为:
/*
* The run method which is ran once the thread is started.
*/
public void run() {
//Run until the thread has bought 10 packages, this ensures the thread
//will eventually stop execution automatically.
for(packsBought = 0; packsBought < 10; packsBought++)
{
try {
//Sleep for a random amount of time between 1 and 50
//milliseconds.
Thread.sleep(this.rand.nextInt(49) + 1);
//Catch any interruptExceptions.
} catch (InterruptedException ex) {
//There is no problem if this exception is thrown, the thread
//will just make an order earlier than planned. that being said
//there should be no manner in which this exception is thrown.
}
//Create a new order.
Order order = new Order(this.rand.nextInt(3)+ 1,
this,
this.isPrime);
//Set the time at which the order was placed as now.
order.setOrderTime(System.currentTimeMillis());
//place the newly created order in the warehouse.
this.warehouse.placeOrder(order);
}
//Notify the thread has finished execution.
System.out.println("Thread: " + super.getName() + " has finished.");
}
在买家 run();
功能中,但我仍然收到一些下了超过 10 个订单的帖子。我还删除了 handleOrder();
函数中购买包数量的更新,因为现在不需要了。 here 是 Test.java 的更新版本(其中所有 类 都在一起以便于执行)这里似乎有不同的问题。
我相信你可能在追鬼。我不完全确定为什么您看到的输出比您预期的要多,但所下订单的数量似乎是有序的。请允许我澄清一下:
我在 Warehouse
class 中添加了 Map<String,Integer>
来映射每个线程放置的订单数量:
private Map<String,Integer> ordersPlaced = new TreeMap<>();
// Code omitted for brevity
public void placeOrder(Order order)
{
try
{
//halt untill there are enough packs to handle an order.
this.notFullBuffer.acquire();
//Lock to signify the start of the critical section.
this.mutexBuffer.lock();
ordersPlaced.merge(Thread.currentThread().getName(), 1, Integer::sum);
// Rest of method
}
然后我在main
方法中添加了一个for循环来执行代码100次,并在每次迭代的末尾添加以下代码:
warehouse.ordersPlaced.forEach((thread, orders) -> System.out.printf(" %s - %d%n", thread, orders));
我在 lambda 表达式中放置了一个断点,条件为 orders != 10
。在我执行的 100 多次运行中从未触发过这种情况。据我所知,您的代码正在按预期工作。我已经将 nWorkers
和 nBuyers
都增加到 100 只是为了确定。
我相信您使用 ReentrantLock
是正确的,我同意它可能是您用例的最佳选择。
参考你在 pastebin 上的代码
一般问题:
在函数 public void handleOrder() he sleep (line 582) Thread.sleep(5 * nPacks);在 lock(): unlock(): 块内。
在这种睡眠状态下,拥有多个 DispatchWorker 是没有意义的,因为 n-1 将在第 559 行 this.mutexBuffer.lock() 等待,而另一个人正在第 582 行休眠。
错误:
该错误在第 173 行。您应该将其删除。
在您的 main() 中,您加入了所有买家,这是正确的。然后你试图阻止工人。此时的工人已经运行个完成的订单,将在几秒后完成。你应该只设置 worker.runThread(false);然后加入 thead(可能在两个单独的循环中)。该解决方案真正等待工人完成订单。中断在第 582 行休眠的线程将引发 InterruptedException 并跳过以下行,特别是更新 inPrime 和 in Normal 计数器生成不可预测行为的第 596 或 600 行。
将第 582 行移动到第 633 行之后并删除第 173 行将解决问题
如何测试:
我的建议是引入一个计算供应商生成的所有 Packs 盒子的计数器和一个计算所有订购盒子的计数器,最后检查生成的盒子是否等于订购时加上留在妓院的盒子。
代码存在一些并发问题,但主要错误与它们无关:它在 placeOrder
的第 512 行开始的块中
//Enumerate all non prime orders in the list.
for (int i = inPrime; i < sizeOrderList - 1; i++) {
//Move the non prime order back 1 position in the list.
buffer[i + 1] = buffer[i];
}
当缓冲区只有一个正常顺序时,则inPrime
值为0,inNormal
为1,buffer[0]
为正常顺序,其余缓冲区为空。
移动非引物订单的代码,从索引 0 开始,然后执行:
buffer[1] = buffer[0] //normal order in 0 get copied to 1
buffer[2] = buffer[1] //now its in 1, so it gets copied to 2
buffer[3] = buffer[2] //now its in 2 too, so it gets copied to 3
....
所以它将正常顺序移动到 buffer[1]
但随后它复制了用该顺序填充所有缓冲区的内容。
要解决这个问题,您应该以相反的顺序复制数组:
//Enumerate all non prime orders in the list.
for (int i = (sizeOrderList-1); i > inPrime; i--) {
//Move the non prime order back 1 position in the list.
buffer[i] = buffer[i-1];
}
关于并发问题:
- 如果您检查一个线程上的字段,由另一个线程更新,您应该将其声明为
volatile
。 DispatcherWorker
和 ResourceSupplier
中的 run
字段就是这种情况。参见:
- 当调度程序线程仍在处理包时,您开始中断它们(第 183 行)。因此,如果它们在 573、574 或 579 停止,它们将抛出一个
InterruptedException
并且不会完成处理(因此在最后的代码中并不总是所有的包都被交付)。您可以通过在开始中断调度程序线程之前检查缓冲区是否为空来避免这种情况,在 175 上调用 warehouse.notFullBuffer.acquire(warehouse.sizeOrderList);
- 在捕获
InterruptedException
时,您应该始终调用 Thread.currentThread().interrupt();
以保持线程的中断状态。参见:
我正在为一个用于模拟仓库的练习开发多线程应用程序(类似于生产者消费者问题)但是我 运行 在增加消费者数量的程序中遇到了一些麻烦线程使程序以意想不到的方式运行。
代码:
我正在创建一个名为 buyer 的生产者线程,其目标是每次从仓库中准确地订购 10 个订单。为此,他们有一个名为仓库的共享对象,买家可以在该对象上下订单,然后将订单存储在共享对象的缓冲区中。在此之后,买家会休眠一段时间,直到它再次尝试或所有包裹都已购买。执行此操作的代码如下所示:
public void run() {
//Run until the thread has bought 10 packages, this ensures the thread
//will eventually stop execution automatically.
while(this.packsBought < 10) {
try {
//Sleep for a random amount of time between 1 and 50
//milliseconds.
Thread.sleep(this.rand.nextInt(49) + 1);
//Catch any interruptExceptions.
} catch (InterruptedException ex) {
//There is no problem if this exception is thrown, the thread
//will just make an order earlier than planned. that being said
//there should be no manner in which this exception is thrown.
}
//Create a new order.
Order order = new Order(this.rand.nextInt(3)+ 1,
this,
this.isPrime);
//Set the time at which the order was placed as now.
order.setOrderTime(System.currentTimeMillis());
//place the newly created order in the warehouse.
this.warehouse.placeOrder(order);
}
//Notify the thread has finished execution.
System.out.println("Thread: " + super.getName() + " has finished.");
}
如您所见,函数placeOrder(Order order);
用于在仓库下订单。此功能负责根据与主要状态相关的一些逻辑将订单放入队列中。该函数如下所示:
public void placeOrder(Order order) {
try{
//halt untill there are enough packs to handle an order.
this.notFullBuffer.acquire();
//Lock to signify the start of the critical section.
this.mutexBuffer.lock();
//Insert the order in the buffer depending on prime status.
if (order.isPrime()) {
//prime order, insert behind all prime orders in buffer.
//Enumerate all non prime orders in the list.
for (int i = inPrime; i < sizeOrderList - 1; i++) {
//Move the non prime order back 1 position in the list.
buffer[i + 1] = buffer[i];
}
// Insert the prime order.
buffer[inPrime++] = order;
} else {
//No prime order, insert behind all orders in buffer.
buffer[inPrime + inNormal++] = order;
}
//Notify the DispatchWorkers that a new order has been placed.
this.notEmptyBuffer.release();
//Catch any InterruptException that might occure.
} catch(InterruptedException e){
//Even though this isn't expected behavior, there is no reason to
//notify the user of this event or to preform any other action as
//the thread will just return to the queue before placing another
//error if it is still required to do so.
} finally {
//Unlock and finalize the critical section.
mutexBuffer.unlock();
}
}
订单由充当消费者线程的工人消费。线程本身包含非常简单的代码循环,直到处理完所有订单。在此循环中,对同一个仓库对象调用不同的函数 handleOrder();
,该对象处理缓冲区中的单个订单。它使用以下代码执行此操作:
public void handleOrder(){
//Create a variable to store the order being handled.
Order toHandle = null;
try{
//wait until there is an order to handle.
this.notEmptyBuffer.acquire();
//Lock to signify the start of the critical section.
this.mutexBuffer.lock();
//obtain the first order to handle as the first element of the buffer
toHandle = buffer[0];
//move all buffer elementst back by 1 position.
for(int i = 1; i < sizeOrderList; i++){
buffer[i - 1] = buffer[i];
}
//set the last element in the buffer to null
buffer[sizeOrderList - 1] = null;
//We have obtained an order from the buffer and now we can handle it.
if(toHandle != null) {
int nPacks = toHandle.getnPacks();
//wait until the appropriate resources are available.
this.hasBoxes.acquire(nPacks);
this.hasTape.acquire(nPacks * 50);
//Now we can handle the order (Simulated by sleeping. Although
//in real live Amazon workers also have about 5ms of time per
//package).
Thread.sleep(5 * nPacks);
//Calculate the total time this order took.
long time = System.currentTimeMillis() -
toHandle.getOrderTime();
//Update the total waiting time for the buyer.
toHandle.getBuyer().setWaitingTime(time +
toHandle.getBuyer().getWaitingTime());
//Check if the order to handle is prime or not.
if(toHandle.isPrime()) {
//Decrement the position of which prime orders are
//inserted into the buffer.
inPrime--;
} else {
//Decrement the position of which normal orders are
//inserted into the buffer.
inNormal--;
}
//Print a message informing the user a new order was completed.
System.out.println("An order has been completed for: "
+ toHandle.getBuyer().getName());
//Notify the buyer he has sucsessfully ordered a new package.
toHandle.getBuyer().setPacksBought(
toHandle.getBuyer().getPacksBought() + 1);
}else {
//Notify the user there was a critical error obtaining the
//error to handle. (There shouldn't exist a case where this
//should happen but you never know.)
System.err.println("Something went wrong obtaining an order.");
}
//Notify the buyers that a new spot has been opened in the buffer.
this.notFullBuffer.release();
//Catch any interrupt exceptions.
} catch(InterruptedException e){
//This is expected behavior as it allows us to force the thread to
//revaluate it's main running loop when notifying it to finish
//execution.
} finally {
//Check if the current thread is locking the buffer lock. This is
//done as in the case of an interrupt we don't want to execute this
//code if the thread interrupted doesn't hold the lock as that
//would result in an exception we don't want.
if (mutexBuffer.isHeldByCurrentThread())
//Unlock the buffer lock.
mutexBuffer.unlock();
}
}
问题:
为了验证程序的功能,我使用语句的输出:
System.out.println("An order has been completed for: "
+ toHandle.getBuyer().getName());
来自 handleOrder();
函数。我将整个输出放在一个文本文件中,删除该 println();
语句未添加的所有行并计算行数以了解已处理了多少订单。我希望这个值等于线程数乘以 10,但通常情况并非如此。 运行 测试 我注意到有时它确实有效并且没有问题,但有时一个或多个买家线程收到的订单多于应有的数量。有 5 个买家线程应该有 50 个输出,但我得到 50 到 60 行(订单位置)。
将线程数量增加到 30 会增加问题,现在我可以预期增加多达 50% 的订单,一些线程最多下 30 个订单。
做一些研究这称为数据争用,是由 2 个线程同时访问相同数据而其中 1 个线程写入数据引起的。这基本上改变了数据,使得另一个线程无法使用它期望使用的相同数据。
我的尝试:
我坚信 ReentrantLocks
旨在处理此类情况,因为如果另一个线程尚未离开,它们应该阻止任何线程进入一段代码。 placeOrder(Order order);
和 handleOrder();
函数都使用了这个机制。因此,我假设我没有正确实施。这是项目的一个版本,它可以从一个名为 Test.java 的文件中编译和执行。谁能看一下上面解释的代码并告诉我我做错了什么?
编辑
我注意到买家可以下 10 个以上的订单,所以我将代码更改为:
/*
* The run method which is ran once the thread is started.
*/
public void run() {
//Run until the thread has bought 10 packages, this ensures the thread
//will eventually stop execution automatically.
for(packsBought = 0; packsBought < 10; packsBought++)
{
try {
//Sleep for a random amount of time between 1 and 50
//milliseconds.
Thread.sleep(this.rand.nextInt(49) + 1);
//Catch any interruptExceptions.
} catch (InterruptedException ex) {
//There is no problem if this exception is thrown, the thread
//will just make an order earlier than planned. that being said
//there should be no manner in which this exception is thrown.
}
//Create a new order.
Order order = new Order(this.rand.nextInt(3)+ 1,
this,
this.isPrime);
//Set the time at which the order was placed as now.
order.setOrderTime(System.currentTimeMillis());
//place the newly created order in the warehouse.
this.warehouse.placeOrder(order);
}
//Notify the thread has finished execution.
System.out.println("Thread: " + super.getName() + " has finished.");
}
在买家 run();
功能中,但我仍然收到一些下了超过 10 个订单的帖子。我还删除了 handleOrder();
函数中购买包数量的更新,因为现在不需要了。 here 是 Test.java 的更新版本(其中所有 类 都在一起以便于执行)这里似乎有不同的问题。
我相信你可能在追鬼。我不完全确定为什么您看到的输出比您预期的要多,但所下订单的数量似乎是有序的。请允许我澄清一下:
我在 Warehouse
class 中添加了 Map<String,Integer>
来映射每个线程放置的订单数量:
private Map<String,Integer> ordersPlaced = new TreeMap<>();
// Code omitted for brevity
public void placeOrder(Order order)
{
try
{
//halt untill there are enough packs to handle an order.
this.notFullBuffer.acquire();
//Lock to signify the start of the critical section.
this.mutexBuffer.lock();
ordersPlaced.merge(Thread.currentThread().getName(), 1, Integer::sum);
// Rest of method
}
然后我在main
方法中添加了一个for循环来执行代码100次,并在每次迭代的末尾添加以下代码:
warehouse.ordersPlaced.forEach((thread, orders) -> System.out.printf(" %s - %d%n", thread, orders));
我在 lambda 表达式中放置了一个断点,条件为 orders != 10
。在我执行的 100 多次运行中从未触发过这种情况。据我所知,您的代码正在按预期工作。我已经将 nWorkers
和 nBuyers
都增加到 100 只是为了确定。
我相信您使用 ReentrantLock
是正确的,我同意它可能是您用例的最佳选择。
参考你在 pastebin 上的代码 一般问题: 在函数 public void handleOrder() he sleep (line 582) Thread.sleep(5 * nPacks);在 lock(): unlock(): 块内。 在这种睡眠状态下,拥有多个 DispatchWorker 是没有意义的,因为 n-1 将在第 559 行 this.mutexBuffer.lock() 等待,而另一个人正在第 582 行休眠。
错误: 该错误在第 173 行。您应该将其删除。 在您的 main() 中,您加入了所有买家,这是正确的。然后你试图阻止工人。此时的工人已经运行个完成的订单,将在几秒后完成。你应该只设置 worker.runThread(false);然后加入 thead(可能在两个单独的循环中)。该解决方案真正等待工人完成订单。中断在第 582 行休眠的线程将引发 InterruptedException 并跳过以下行,特别是更新 inPrime 和 in Normal 计数器生成不可预测行为的第 596 或 600 行。
将第 582 行移动到第 633 行之后并删除第 173 行将解决问题
如何测试: 我的建议是引入一个计算供应商生成的所有 Packs 盒子的计数器和一个计算所有订购盒子的计数器,最后检查生成的盒子是否等于订购时加上留在妓院的盒子。
代码存在一些并发问题,但主要错误与它们无关:它在 placeOrder
//Enumerate all non prime orders in the list.
for (int i = inPrime; i < sizeOrderList - 1; i++) {
//Move the non prime order back 1 position in the list.
buffer[i + 1] = buffer[i];
}
当缓冲区只有一个正常顺序时,则inPrime
值为0,inNormal
为1,buffer[0]
为正常顺序,其余缓冲区为空。
移动非引物订单的代码,从索引 0 开始,然后执行:
buffer[1] = buffer[0] //normal order in 0 get copied to 1
buffer[2] = buffer[1] //now its in 1, so it gets copied to 2
buffer[3] = buffer[2] //now its in 2 too, so it gets copied to 3
....
所以它将正常顺序移动到 buffer[1]
但随后它复制了用该顺序填充所有缓冲区的内容。
要解决这个问题,您应该以相反的顺序复制数组:
//Enumerate all non prime orders in the list.
for (int i = (sizeOrderList-1); i > inPrime; i--) {
//Move the non prime order back 1 position in the list.
buffer[i] = buffer[i-1];
}
关于并发问题:
- 如果您检查一个线程上的字段,由另一个线程更新,您应该将其声明为
volatile
。DispatcherWorker
和ResourceSupplier
中的run
字段就是这种情况。参见: - 当调度程序线程仍在处理包时,您开始中断它们(第 183 行)。因此,如果它们在 573、574 或 579 停止,它们将抛出一个
InterruptedException
并且不会完成处理(因此在最后的代码中并不总是所有的包都被交付)。您可以通过在开始中断调度程序线程之前检查缓冲区是否为空来避免这种情况,在 175 上调用 - 在捕获
InterruptedException
时,您应该始终调用Thread.currentThread().interrupt();
以保持线程的中断状态。参见:
warehouse.notFullBuffer.acquire(warehouse.sizeOrderList);