- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
所以,我对多线程还很陌生,并且最近在我的所有程序中都使用了这个想法。在我开始更多地使用它之前,我真的想确保它是使用 Executor、CompletionService 和 BlockingQueue 以及 Observer 来实现多线程的正确有效方法。我将在下面提供示例代码,但首先让我快速解释一下我认为它是如何工作的,也许这会有所帮助。
我拥有的第一件事是一个 BlockingQueue,所有任务都通过 add(Task task) 方法添加到该队列中。创建类后,将调用 run 方法,并使用 while(true) 调用 take 来阻塞队列,直到有内容添加到任务队列中。
一旦将某些内容添加到 run() 内的队列中,queue.take() 就会返回队列中的项目。然后我将这个项目传递给 WorkerThread 类来处理它。该workerThread被添加到CompletionService池中,该池处理等待线程完成的情况。
好的,现在是我不确定是否正确的部分。我还有一个实现 runnable 的内部类,并在类初始化时启动。它的工作是永远循环调用 pool.take()。因此,这本质上是等待其中一个 WorkerThread 完成。我让完成服务处理这个问题。一旦 take() 获取值,内部类就会将其传递给通知观察者方法。
这个实现可以吗?让我有点担心的是,主类在任务队列上运行 while(true) 循环,并且内部类也在池上循环等待从 WorkerThread 接收结果?
这是一个示例实现。你觉得怎么样?
public class HttpSchedulerThreaded extends Observable implements Runnable {
private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
{
numThreadsInPool = numThreads;
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}
public HttpSchedulerThreaded()
{
numThreadsInPool = 1;
executor = Executors.newFixedThreadPool(1);
doneSignal = new CountDownLatch(1);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}
public void setThreadCount(int numThreads)
{
if(!isRunning){
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
numThreadsInPool = numThreads;
}
}
public void start()
{
if(!isRunning){
responseWorkerThread.start();
new Thread(this).start();
isRunning = true;
}
}
public void add(VulnInfo info) {
queue.add(info);
}
@Override
public void run() {
// TODO Auto-generated method stub
while(shouldRun)
{
try {
VulnInfo info = queue.take();
Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
//System.out.println("submitting to pooler: " + info.getID());
pool.submit(worker);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
* are complete it will send them to server for completion.
* @author Steve
*
*/
class HttpSchedulerWorker implements Runnable{
public void run() {
// TODO Auto-generated method stub
while(true)
{
VulnInfo vulnInfo = null;
try {
//System.out.println("taking finished request");
Future<VulnInfo> tmp = pool.take();
// Future<VulnInfo> tmp = pool.poll();
if(tmp != null)
vulnInfo = tmp.get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(vulnInfo != null)
{
//System.out.println("updating all observers: " + vulnInfo.getID());
updateObservers(vulnInfo);
}
}
}
}
最佳答案
根据我的经验,你的解决方案似乎没问题。我有三点意见/建议:
responseWorkerThread = new Thread(schedulerWorker)
和 responseWorkerThread.start()
后,您基本上就已经分解了这两个循环。这部分看起来还不错。您似乎确实正确使用了 Executor 的 API,但看起来您可能需要更多代码来停止 HttpScheduledWorker 线程并关闭 ExecutionCompletionService
作为 HttpSchedulerThreaded
类的一部分。队列
。 ExecutionCompletionService
已使用 BlockingQueue
来管理提交给它的任务。关于Java正确使用ExecutorService、CompletionService、BlockingQueue和Observer?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7286945/
我的理解是1.5加入了callable,runnable接口(interface)保持原样,防止世界末日。为什么我不能实例化 ThreadPoolExecutor (core, max, tu, un
我正在制作一个包含两个线程的应用程序:其中一个向 LinkedBlockingQueue 写入一个值,另一个正在读取。我正在使用 ScheduledExecutorService 在某个时间段内以秒为
我正在尝试 Autowiring 参数化的阻塞队列: @Bean(name = "saveProductQueue") public BlockingQueue saveProductQueue()
我正在实现一个程序,其中主线程将各种消息推送到工作线程,而工作线程将工作结果推送回主线程。 为此,我计划使用两个队列,一个用于推送到工作线程,另一个用于从中拉出。 据我了解,线程会缓存对象,因此如果它
我正在使用 RabbitMQ,它默认为消费者使用 LinkedBlockingQueue 。它有一个阻塞的 nextDelivery() 方法,该方法基本上调用队列上的 take() 。 但是如果在调
我有 n 个生产者线程通过 BlockingQueue 为 1 个消费者线程提供数据。我正在使用 .put 和 .take (后者当 .peek != null 时)。除了明显在传输过程中不可避免的数
我已经实现了一个带有套接字线程池的两人游戏。每个玩家都连接到自己的线程。我按照this添加了一个消息队列系统文章。 问题是消息滞后。第一个玩家的第一个响应会按预期添加到 messageQueue 中。
据我所知,BlockingCollection 使用非繁忙等待,这是对新项目/回调的通知。所以我不明白它是如何阻塞的,但我认为我可能混合了阻塞的线程和阻塞的共享对象访问? 最佳答案 这是一个很好的解释
我的 Android 应用程序有一个长时间运行的后台服务,据我所知,它在应用程序的主线程中运行,因此,任何耗时或阻塞的任务都应移至单独的线程。 现在,情况是这样的,我不明白/困惑: 当我从一个 Act
我在数据库前面使用 LinkedBlockingQueue。一个线程写入队列,另一个线程从队列读取。 我认为两个并发写入是不可能的。但是是否有可能一个线程写入而另一个线程同时从队列中读取呢?如果没有,
我有一个在单个后台线程上处理工作事件的 BlockingQueue。各种线程调用 add 将一些工作添加到队列中,单个后台线程调用 take 获取工作并一次处理一个。最终可能是停止处理工作的时候了,我
我在多线程系统中使用 BlockingQueue,其中同步块(synchronized block)将项目添加到列表中。有时它不会将项目添加到列表中,它遗漏的项目是随机的。我尝试将以下行添加到代码中,
我有以下阻塞队列; final BlockingQueue blockingQueue = new LinkedBlockingQueue(); 在哪里 public class Message
这个问题的标题让我怀疑这是否存在,但仍然: 我感兴趣的是是否有 Java 的 BlockingQueue 的实现,它受大小限制,从不阻塞,而是在尝试入队太多元素时抛出异常。 编辑 - 我将 Block
假设我有 BlockingQueue 并且一些线程被称为 take() 但此时队列是空的。假设我以某种方式知道将来不会有新元素出现在队列中。如何释放那些被称为 take() 的线程等待?谢谢! p
我想要一个线程安全的容器,它会阻止调用者,直到有项目可用为止。项目将以每秒 1000 秒的速度添加到此容器中,但不会以相同的速度排出。因此,我希望容器不允许重复。我围绕 LinkedBlockingQ
我正在使用 BlockingQueue(LinkedBlockingQueue) 在多个线程之间同步数据。请看下图。 主线程是一个生产者,它生产对象,然后将它们放入每个消费者的队列中(线程2-10)。
我有一个容量为 1 的 BlockingQueue。它存储收到的股票的最后价格。价格保留在队列中,直到客户端轮询队列。然后,我有一个名为 getLatestPrice() 的方法,它应该返回股票的最新
我的 Java 代码中使用 BlockingQueue 时存在潜在的竞争条件,我想知道如何修改代码来避免这种情况: private static BlockingQueue ftpQueue = ne
我有一种情况需要 BlockingQueue 上的方法 peekWait。我会将此方法描述为 retrieves but not remove the head of the queue, waiti
我是一名优秀的程序员,十分优秀!