gpt4 book ai didi

java - 如何实现一个可以被多个线程处理的队列?

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:22:40 24 4
gpt4 key购买 nike

我觉得我做错了。我正在创建线程,这些线程应该处理共享队列中的一些数据。我的问题是程序运行缓慢且占用大量内存,我怀疑队列可能没有像我希望的那样共享。我怀疑这是因为在我的代码中我添加了一行显示队列的大小,如果我启动 2 个线程然后我得到两个数字完全不同的输出并且似乎自行递增(我认为它可能是相同的数字但是也许它从 100 跳到 2 等等,但在观看之后它显示 105 和 5 并且以不同的速率移动。如果我有 4 个线程,那么我会看到 4 个不同的数字)。

这是相关部分的片段。我在程序顶部的队列中用我想要的数据创建了一个静态类

static class queue_class {
int number;
int[] data;
Context(int number, int[] data) {
this.number = number;
this.data = data;
}
}

然后我在将一些作业发送到可调用对象后创建队列..

static class process_threaded implements Callable<Void> {
// queue with contexts to process
private Queue<queue_class> queue;

process_threaded(queue_class request) {
queue = new ArrayDeque<queue_class>();
queue.add(request);
}

public Void call() {
while(!queue.isEmpty()) {
System.out.println("in contexts queue with a size of " + queue.size());
Context current = contexts.poll();
//get work and process it, if it work great then the solution goes elsewhere
//otherwise, depending on the data, its either discarded or parts of it is added back to queue
queue.add(new queue_class(k, data_list));

如您所见,数据有 3 个选项,如果数据良好则发送,如果数据非常糟糕则丢弃或发送回队列。我认为当它被发回时队列正在运行,但我怀疑是因为每个线程都在自己的队列而不是共享队列上工作。

这个猜测是否正确,我做错了吗?

最佳答案

您的评估是正确的,每个线程(可能)都在使用自己的队列,因为您在 Callable 的构造函数中创建了一个队列。 . (实际上有一个 Callable<Void> 很奇怪——不就是一个 Runnable 吗?)

还有其他问题,例如,您使用的队列不是线程安全的,或者您的代码无法按编写的方式进行编译。

不过,重要的问题是您真的首先需要显式创建队列吗?为什么不用 ExecutorService您向其提交 Callable s(或 Runnables,如果您决定进行该切换):将对执行程序的引用传递给您的 Callable s,他们可以添加新的 Callable s 到执行者要运行的任务队列。无需重新发明轮子。

例如:

static class process_threaded implements Runnable {
// Reference to an executor
private final ExecutorService exec;
// Reference to the job counter
private final AtomicInteger jobCounter;
// Request to process
private queue_class request;

process_threaded( ExecutorService exec, AtomicInteger counter, queue_class request) {
this.exec = exec;
this.jobCounter = counter;
this.jobCounter.incrementAndGet(); // Assuming that you will always
// submit the process_threaded to
// the executor if you create it.
this.request = request;
}

public run() {
//get work and process **request**, if it work great then the solution goes elsewhere
//otherwise, depending on the data, its either discarded or parts of are added back to the executor
exec.submit( new process_threaded( exec, new queue_class(k, data_list) ) );

// Can do some more work

// Always run before returning: counter update and notify the launcher
synchronized(jobCounter){
jobCounter.decrementAndGet();
jobCounter.notifyAll();
}
}
}

编辑:

要解决何时关闭执行程序的问题,我认为最简单的解决方案是拥有一个作业计数器,并在它达到 0 时关闭。为了线程安全 AtomicInteger可能是最好的选择。我在上面添加了一些代码来合并更改。那么您的启动代码将如下所示:

void theLauncher() {

AtomicInteger jobCounter = new AtomicInteger( 0 );

ExecutorService exec = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcesses());

exec.submit( new process_threaded( exec, jobCounter, someProcessRequest ) );
// Can submit some other things here of course...

// Wait for jobs to complete:
for(;;jobCounter.get() > 0){
synchronized( jobCounter ){ // (I'm not sure if you have to have the synchronized block, but I think this is safer.
if( jobCounter.get() > 0 )
jobCounter.wait();
}
}

// Now you can shutdown:
exec.shutdown();
}

关于java - 如何实现一个可以被多个线程处理的队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10150943/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com