- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我必须完成一项练习,我必须使用至少一个生产者线程和 x 个消费者线程的生产者/消费者模式在我的文件夹路径中查找“.java”文件。
生产者消费者级:首先,当生产者完成查找文件时,我尝试通过设置从 true 到 false 的 while 循环来阻止消费者,但这不起作用。它不起作用,因为线程显然仍在运行,只是没有做任何有用的事情。现在我也使用 closePool() 函数。
所以如果我不忍受我的名为locka的锁,这个函数确实可以工作。这基本上是我不明白的事情。
所以如果我有
loka.lock();
ende = false;
loka.unlock();
和
while(ende){
loka.lock();
System.out.println(xy.getQueue());
loka.unlock();
}
closePool() 函数永远不会被调用。这是我不明白的事情。如果我把锁放在 while 循环中,它就会工作并且线程会停止。
问题:
1) ende 参数无论如何都会被设置为 false,因此锁最终会被释放。
2)其次我只锁定了方法的一部分而不是对象?!据我了解,同一对象中其他方法中的其他代码仍然会同时工作。或者是像synchronized和i这样的锁在处于锁定状态时同步整个对象?根据我的理解,消费者线程中的 while 循环被锁定,但生产者线程仍然会调用 closePool();
额外说明:也许我什至没有以正确的方式设计我的生产者/消费者模式。
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class FindJavaVisitorp extends SimpleFileVisitor<Path> {
private BlockingQueue<String> xxx = new ArrayBlockingQueue<String>(10);
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
if (file.toString().endsWith(".java")) {
try {
xxx.put(file.toString());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return FileVisitResult.CONTINUE;
}
public String getQueue() throws InterruptedException {
return xxx.take();
}
}
public class ProducerConsumer {
private volatile boolean ende = true;
private Path path;
private FindJavaVisitorp xy;
private Lock loka = new ReentrantLock();
private ExecutorService pepe;
public ProducerConsumer(Path path, FindJavaVisitorp xy, ExecutorService xyz) {
this.path = path;
this.xy = xy;
pepe = xyz;
}
public void produce() throws IOException, InterruptedException {
Files.walkFileTree(path, xy);
loka.lock();
ende = false;
loka.unlock();
closePool();
}
public void consume() throws InterruptedException {
while (ende) {
loka.lock();
System.out.println(xy.getQueue());
loka.unlock();
}
}
public void closePool() {
pepe.shutdown();
try {
if (!pepe.awaitTermination(60, TimeUnit.SECONDS)) {
pepe.shutdownNow();
if (!pepe.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool couldn't be terminated!");
}
}
} catch (InterruptedException e) {
pepe.shutdownNow();
}
}
}
public class Test {
public static void main(String[] args) {
Path startingDir = Paths.get("/usr/local/");
FindJavaVisitorp x = new FindJavaVisitorp();
ExecutorService exec = Executors.newCachedThreadPool();
final ProducerConsumer pp = new ProducerConsumer(startingDir, x, exec);
exec.submit(new Runnable() {
public void run() {
try {
pp.produce();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
// x.printQueue();
for (int j = 0; j < 5; j++) {
exec.submit(new Runnable() {
public void run() {
try {
pp.consume();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
exec.shutdown();
}
}
最佳答案
。以下代码将尝试保护 loka=false
免受任何并发访问。
loka.lock();
ende = false;//critical section
loka.unlock();
以下代码将不受并发访问的影响,并且与上述临界区互斥。
while(ende){
loka.lock();
System.out.println(xy.getQueue());//critical section
loka.unlock();
}
由于这两个关键部分之间没有任何共同点,因此互斥不起作用。由于 ende
是 volatile 的,用锁保护它不会像 primitive types already have atomic access 那样做任何事情。 .
Reads and writes are atomic for reference variables and for most primitive variables (all types except long and double).
Reads and writes are atomic for all variables declared volatile (including long and double variables).
lock() 和
unlock()` 保护的 block 内的代码才会被锁定,无法进行并发访问。对象本身可以自由地在这些 block 之外执行任何并发(锁定 block )任务。最后遵循正确的命名约定并为变量指定有意义的名称。
主要答案您的线程仍在运行的原因是因为它们正在等待 blockingQueue.takeItem()
并且它们无法从中释放,除非队列再次被填满,但是由于 Producer 已完成,因此不可能发生这种情况。
如何避免这种行为
BlockingQueue 上没有允许 immediate release of waiting threads 的方法我们可以做的一件事是让生产者放置一个 LAST_ITEM,并让消费者检查他们获得的项目是否是 LAST_ITEM,这样他们就可以释放自己。
以下是工作代码。我对变量和方法名称进行了一些修改,使它们更有意义。
JavaFileVisitor
package filevisitor;
import java.nio.file.FileVisitResult;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class JavaFileVisitor extends SimpleFileVisitor<Path> {
private BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(10);
public static String NO_MORE_ITEMS = "### NO MORE ITEMS ###";
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
if (file.toString().endsWith(".java")) {
try {
blockingQueue.put(file.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return FileVisitResult.CONTINUE;
}
public String getQueueItem() throws InterruptedException {
String item = blockingQueue.take();
if(NO_MORE_ITEMS.equals(item)) {
setNoMoreItems();
}
return item;
}
public void setNoMoreItems() {
try {
blockingQueue.put(NO_MORE_ITEMS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
生产者消费者
package filevisitor;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class ProducerConsumer {
private Path path;
private JavaFileVisitor fileVisitor;
public ProducerConsumer(Path path, JavaFileVisitor visitor) {
this.path = path;
this.fileVisitor = visitor;
}
public void produce() throws IOException, InterruptedException {
Files.walkFileTree(path, fileVisitor);
fileVisitor.setNoMoreItems();
}
public void consume() throws InterruptedException {
while (true) {
String item = fileVisitor.getQueueItem();
if(JavaFileVisitor.NO_MORE_ITEMS.equals(item)) {
break;
}
System.out.println(item);
}
}
}
生产者消费者主要
package filevisitor;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ProducerConsumerMain {
public static void main(String[] args) {
Path startingDir = Paths.get("src/filevisitor");
JavaFileVisitor fileVisitor = new JavaFileVisitor();
ExecutorService executor = Executors.newCachedThreadPool();
final ProducerConsumer producerConsumer = new ProducerConsumer(startingDir, fileVisitor);
executor.submit(new Runnable() {
public void run() {
System.out.println("Producer started");
try {
producerConsumer.produce();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Producer finished");
}
});
for (int j = 0; j < 5; j++) {
executor.submit(new Runnable() {
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " Consumer Started");
try {
producerConsumer.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadName + " Consumer finished");
}
});
}
executor.shutdown();
System.out.println("Executor shutdown, waiting for threads to finish");
try {
executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Exiting main");
}
}
输出
Producer started
pool-1-thread-3 Consumer Started
pool-1-thread-2 Consumer Started
Executor shutdown, waiting for threads to finish
pool-1-thread-5 Consumer Started
pool-1-thread-6 Consumer Started
pool-1-thread-4 Consumer Started
src\filevisitor\JavaFileVisitor.java
src\filevisitor\ProducerConsumerMain.java
src\filevisitor\ProducerConsumer.java
pool-1-thread-6 Consumer finished
pool-1-thread-4 Consumer finished
pool-1-thread-3 Consumer finished
pool-1-thread-5 Consumer finished
Producer finished
pool-1-thread-2 Consumer finished
Exiting main
关于java - java线程生产者-消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34569624/
kafka的Java客户端-消费者 一、kafka消费方式 pull(拉)模式:consumer采用从broker中主动拉取数据。Kafka 采用这种方式 push(推)模式:Kafka没有采用这种方
我编写这个小应用程序是为了解决 Python 中的经典生产者/消费者问题。我知道我可以使用线程安全的队列机制来解决这个问题,但我有兴趣自己解决这个问题来学习。 from threading impor
下面是一个示例消费者/生产者模型的代码: int buffer[MAX]; int fill_ptr = 0; int use_ptr = 0; int count = 3; void put(int
我的消费者、生产者程序有问题,它似乎可以加载,但返回段错误。我已经尝试了一切来修复它,但仍然失败!将不胜感激任何帮助。笔记;代码真的很多,semaphore.h的代码都在里面,有谁想测试一下。其余代码
我正在阅读著名的操作系统概念书(Avi Silberschatz、Peter Baer Galvin、Greg Gagne)第 9 版:http://codex.cs.yale.edu/avi/os-
我正在尝试构建一个服务,为许多异步客户端提供队列以发出请求并等待响应。我需要能够通过每 Y 个持续时间的 X 个请求来限制队列处理。例如:每秒 50 个 Web 请求。它用于第 3 方 REST 服务
我正在尝试使用一组资源来实现生产者/消费者模式,因此每个线程都有一个与之关联的资源。例如,我可能有一个任务队列,其中每个任务都需要一个 StreamWriter写出它的结果。每个任务还必须有参数传
为什么我们需要 Azure 存储帐户上的 blob 容器用于 Eventhub 消费者客户端(我使用的是 python)。为什么我们不能像在 Kafka 中那样直接使用来自 Eventhub(Kafk
我有一个有趣的生产者-消费者衍生产品需要实现,但我无法理解它的算法。因此,每个生产者都会“产生”给定范围(最小值,最大值)之间的数字,这对除以给定“商”给出了相同的提醒。对于消费者来说也是如此。 额外
我需要实现一种生产者/消费者方案,出于性能原因,消费者尝试在一批中处理许多工作项(每个工作项都会耗尽工作队列)。 目前,我只是创建固定数量的相同工作人员,它们在循环中的同一队列上工作。由于其中一些可能
为什么我们需要 Azure 存储帐户上的 blob 容器用于 Eventhub 消费者客户端(我使用的是 python)。为什么我们不能像在 Kafka 中那样直接使用来自 Eventhub(Kafk
我的关系必须按如下方式运作;线程 A 向线程 B 发布一些更改,线程 B 接受该更改并将其发布到线程 C。 问题是生产者-消费者,我使用 BlockingQueue 仅用两个实体来实现它没有问题。我怎
我一直在研究 PC 问题,以了解 Java 同步和线程间通信。使用底部的代码,输出为 Producer produced-0 Producer produced-1 Producer produced
我编写了代码来实现生产者-消费者问题,它似乎工作正常,不需要同步。这可能吗? 如何测试代码并检查它是否确实正常工作?我如何知道是否会发生死锁?现在,我没有跳出循环(即生产者不断插入,消费者不断在无限循
我必须完成一项练习,我必须使用至少一个生产者线程和 x 个消费者线程的生产者/消费者模式在我的文件夹路径中查找“.java”文件。 生产者消费者级:首先,当生产者完成查找文件时,我尝试通过设置从 tr
我被分配了一项类(class)作业来实现消费者/生产者问题的解决方案,该解决方案使用单个生产者、单个消费者和循环缓冲区。这应该用 C 语言编写。 不幸的是,我们没有获得任何学习 Material ,并
有人可以检查我的代码并告诉我是否走在正确的轨道上。我似乎有点迷失了。如果您看到我的错误,请告诉我它们。 我想做的是使用我自己的信号量以及 GCD 来解决有界缓冲区问题。 提前致谢.. sema.c v
我要处理有界缓冲区、生产者消费者问题,只能修改 prod 和 cons 函数。此代码仅在一个消费者和生产者线程上运行,不会出现任何问题。但对于每个都有多个,迟早总会给我带来同样的问题: p5p1:
我有一个从多个线程访问的类的实例。此类接受此调用并将元组添加到数据库中。我需要以串行方式完成此操作,因为由于某些数据库约束,并行线程可能会导致数据库不一致。 由于我不熟悉 C# 中的并行性和并发性,所
我正在尝试编写一个批量邮件服务,它有两种方法: add(Mail mail):可以发送邮件,由Producers调用 flushMailService():刷新服务。消费者应该获取一个列表,并调用另一
我是一名优秀的程序员,十分优秀!