gpt4 book ai didi

java - 线程执行太多次并导致竞争条件,即使我正在使用锁

转载 作者:行者123 更新时间:2023-12-03 12:45:45 25 4
gpt4 key购买 nike

我正在为一个用于模拟仓库的练习(类似于生产者消费者问题)开发一个多线程应用程序,但是我在程序中遇到了一些问题,因为增加消费者线程的数量会使程序的行为意想不到的方式。

代码:

我正在创建一个名为 buyer 的生产者线程,其目标是每次从仓库准确地订购 10 个订单。为此,他们有一个名为仓库的共享对象,买家可以在该对象上下订单,然后将订单存储在共享对象的缓冲区中。在此之后,买家会 hibernate 一段时间,直到它再次尝试或所有包裹都已购买。执行此操作的代码如下所示:

public void run() {
//Run until the thread has bought 10 packages, this ensures the thread
//will eventually stop execution automatically.
while(this.packsBought < 10) {
try {
//Sleep for a random amount of time between 1 and 50
//milliseconds.
Thread.sleep(this.rand.nextInt(49) + 1);
//Catch any interruptExceptions.
} catch (InterruptedException ex) {
//There is no problem if this exception is thrown, the thread
//will just make an order earlier than planned. that being said
//there should be no manner in which this exception is thrown.
}

//Create a new order.
Order order = new Order(this.rand.nextInt(3)+ 1,
this,
this.isPrime);

//Set the time at which the order was placed as now.
order.setOrderTime(System.currentTimeMillis());

//place the newly created order in the warehouse.
this.warehouse.placeOrder(order);
}

//Notify the thread has finished execution.
System.out.println("Thread: " + super.getName() + " has finished.");
}

如您所见,函数 placeOrder(Order order); 用于在仓库下订单。此功能负责根据与主要状态相关的一些逻辑将订单放入队列中。该函数如下所示:

public void placeOrder(Order order) {
try{
//halt untill there are enough packs to handle an order.
this.notFullBuffer.acquire();

//Lock to signify the start of the critical section.
this.mutexBuffer.lock();

//Insert the order in the buffer depending on prime status.
if (order.isPrime()) {
//prime order, insert behind all prime orders in buffer.

//Enumerate all non prime orders in the list.
for (int i = inPrime; i < sizeOrderList - 1; i++) {
//Move the non prime order back 1 position in the list.
buffer[i + 1] = buffer[i];
}

// Insert the prime order.
buffer[inPrime++] = order;

} else {
//No prime order, insert behind all orders in buffer.
buffer[inPrime + inNormal++] = order;
}
//Notify the DispatchWorkers that a new order has been placed.
this.notEmptyBuffer.release();

//Catch any InterruptException that might occure.
} catch(InterruptedException e){
//Even though this isn't expected behavior, there is no reason to
//notify the user of this event or to preform any other action as
//the thread will just return to the queue before placing another
//error if it is still required to do so.
} finally {
//Unlock and finalize the critical section.
mutexBuffer.unlock();
}
}

订单由充当消费者线程的 worker 消费。线程本身包含非常简单的代码循环,直到处理完所有订单。在这个循环中,不同的函数 handleOrder(); 被调用在同一个仓库对象上,该对象处理缓冲区中的单个订单。它使用以下代码执行此操作:

public void handleOrder(){
//Create a variable to store the order being handled.
Order toHandle = null;

try{
//wait until there is an order to handle.
this.notEmptyBuffer.acquire();

//Lock to signify the start of the critical section.
this.mutexBuffer.lock();

//obtain the first order to handle as the first element of the buffer
toHandle = buffer[0];

//move all buffer elementst back by 1 position.
for(int i = 1; i < sizeOrderList; i++){
buffer[i - 1] = buffer[i];
}
//set the last element in the buffer to null
buffer[sizeOrderList - 1] = null;

//We have obtained an order from the buffer and now we can handle it.
if(toHandle != null) {
int nPacks = toHandle.getnPacks();

//wait until the appropriate resources are available.
this.hasBoxes.acquire(nPacks);
this.hasTape.acquire(nPacks * 50);

//Now we can handle the order (Simulated by sleeping. Although
//in real live Amazon workers also have about 5ms of time per
//package).
Thread.sleep(5 * nPacks);

//Calculate the total time this order took.
long time = System.currentTimeMillis() -
toHandle.getOrderTime();

//Update the total waiting time for the buyer.
toHandle.getBuyer().setWaitingTime(time +
toHandle.getBuyer().getWaitingTime());

//Check if the order to handle is prime or not.
if(toHandle.isPrime()) {
//Decrement the position of which prime orders are
//inserted into the buffer.
inPrime--;
} else {
//Decrement the position of which normal orders are
//inserted into the buffer.
inNormal--;
}

//Print a message informing the user a new order was completed.
System.out.println("An order has been completed for: "
+ toHandle.getBuyer().getName());

//Notify the buyer he has sucsessfully ordered a new package.
toHandle.getBuyer().setPacksBought(
toHandle.getBuyer().getPacksBought() + 1);
}else {
//Notify the user there was a critical error obtaining the
//error to handle. (There shouldn't exist a case where this
//should happen but you never know.)
System.err.println("Something went wrong obtaining an order.");
}

//Notify the buyers that a new spot has been opened in the buffer.
this.notFullBuffer.release();

//Catch any interrupt exceptions.
} catch(InterruptedException e){
//This is expected behavior as it allows us to force the thread to
//revaluate it's main running loop when notifying it to finish
//execution.
} finally {
//Check if the current thread is locking the buffer lock. This is
//done as in the case of an interrupt we don't want to execute this
//code if the thread interrupted doesn't hold the lock as that
//would result in an exception we don't want.
if (mutexBuffer.isHeldByCurrentThread())
//Unlock the buffer lock.
mutexBuffer.unlock();
}
}

问题:

为了验证程序的功能,我使用语句的输出:

System.out.println("An order has been completed for: " 
+ toHandle.getBuyer().getName());

来自 handleOrder(); 函数。我将整个输出放在一个文本文件中,删除所有未由该 println(); 语句添加的行并计算行数以了解已处理了多少订单。我希望这个值等于线程数乘以 10,但通常情况并非如此。运行测试 我注意到有时它确实有效并且没有问题,但有时一个或多个买家线程接受的订单多于他们应有的数量。有 5 个买家线程应该有 50 个输出,但我得到 50 到 60 行(订单位置)。

将线程数量增加到 30 会增加问题,现在我可以预期增加多达 50% 的订单,一些线程最多可下 30 个订单。

做一些研究,这被称为数据争用,是由 2 个线程同时访问相同数据而其中 1 个线程写入数据引起的。这基本上改变了数据,使得另一个线程无法处理它期望处理的相同数据。

我的尝试:

我坚信 ReentrantLocks 是为处理这种情况而设计的,因为如果另一个线程还没有离开,它们应该阻止任何线程进入一段代码。 placeOrder(Order order);handleOrder(); 函数都使用了这种机制。因此,我假设我没有正确实现。这是该项目的一个版本,它可以从一个名为 Test.java 的文件中编译和执行。 .任何人都可以看一下上面解释的代码并告诉我我做错了什么吗?

编辑

我注意到买家可以下 10 个以上的订单,所以我将代码更改为:

/*
* The run method which is ran once the thread is started.
*/
public void run() {
//Run until the thread has bought 10 packages, this ensures the thread
//will eventually stop execution automatically.
for(packsBought = 0; packsBought < 10; packsBought++)
{
try {
//Sleep for a random amount of time between 1 and 50
//milliseconds.
Thread.sleep(this.rand.nextInt(49) + 1);
//Catch any interruptExceptions.
} catch (InterruptedException ex) {
//There is no problem if this exception is thrown, the thread
//will just make an order earlier than planned. that being said
//there should be no manner in which this exception is thrown.
}

//Create a new order.
Order order = new Order(this.rand.nextInt(3)+ 1,
this,
this.isPrime);

//Set the time at which the order was placed as now.
order.setOrderTime(System.currentTimeMillis());

//place the newly created order in the warehouse.
this.warehouse.placeOrder(order);
}

//Notify the thread has finished execution.
System.out.println("Thread: " + super.getName() + " has finished.");
}

在买家 run(); 函数中,我仍然收到一些线程,这些线程下了超过 10 个订单。我还删除了 handleOrder(); 函数中购买包数量的更新,因为现在不需要了。 here是 Test.java 的更新版本(所有类都放在一起以便于执行)这里似乎有不同的问题。

最佳答案

代码存在一些并发问题,但主要错误与它们无关:它在 placeOrder 的第 512 行开始的 block 中

            //Enumerate all non prime orders in the list. 
for (int i = inPrime; i < sizeOrderList - 1; i++) {
//Move the non prime order back 1 position in the list.
buffer[i + 1] = buffer[i];
}

当缓冲区中只有一个正常顺序时,则inPrime值为0,inNormal为1,buffer[0]为正常顺序,缓冲区的其余部分为空。

移动非引物订单的代码,从索引 0 开始,然后执行:

buffer[1] = buffer[0]  //normal order in 0 get copied to 1
buffer[2] = buffer[1] //now its in 1, so it gets copied to 2
buffer[3] = buffer[2] //now its in 2 too, so it gets copied to 3
....

因此它将正常顺序移动到 buffer[1] 但随后它复制内容以该顺序填充所有缓冲区。

要解决这个问题,您应该以相反的顺序复制数组:

            //Enumerate all non prime orders in the list.
for (int i = (sizeOrderList-1); i > inPrime; i--) {
//Move the non prime order back 1 position in the list.
buffer[i] = buffer[i-1];
}

至于并发问题:

  • 如果您检查一个线程上的字段,由另一个线程更新,您应该将其声明为volatileDispatcherWorkerResourceSupplier 中的 run 字段就是这种情况。请参阅:https://stackoverflow.com/a/8063587/11751648
  • 当调度程序线程仍在处理包时,您开始中断它们(第 183 行)。因此,如果它们在 573、574 或 579 处停止,它们将抛出一个 InterruptedException 并且不会完成处理(因此在最后的代码中并不总是所有包都已交付)。您可以通过在开始中断调度程序线程之前检查缓冲区是否为空来避免这种情况,调用 warehouse.notFullBuffer.acquire(warehouse.sizeOrderList); on 175
  • 捕获InterruptedException 时,您应该始终调用Thread.currentThread().interrupt(); 以保持线程的中断状态。请参阅:https://stackoverflow.com/a/3976377/11751648

关于java - 线程执行太多次并导致竞争条件,即使我正在使用锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65680795/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com