gpt4 book ai didi

java - fork 线程并没有加快我的程序,我在我的情况下是否正确使用它?

转载 作者:行者123 更新时间:2023-11-29 07:15:11 25 4
gpt4 key购买 nike

我想我用错了线程,所以我想问一下这是否是好的设计。基本上我有一个程序从队列中提取数据然后处理它(处理是纯数学所以它是 100% cpu 密集型),然后如果数据是好的它发送到一个“好”队列否则它要么完全丢弃要么部分丢弃被发送回初始“工作”队列以进行进一步处理。这是它的高级逻辑,当我的队列在内存中时,我的程序使用了所有内核并且速度非常快。随着我的数据的增长,我决定使用队列服务器来存储队列,然后将处理分布到多台机器上,但现在速度很慢(每个内核只使用了 40%-60%)。

我尝试分析我的代码(使用 yourkit 和 netbeans 中的内置代码),它说大部分时间 (80%) 都花在了队列程序上。我想我可以通过将所有外部程序内容推送到另一个线程来让我的程序中的数字运算不断进行,但这对性能没有帮助,我想知道我是否做错了。我不确定,但我想知道如果从现有线程(父线程)启动一个线程(子线程),子线程是否必须在父线程完成之前完成?

我的代码非常大,99% 都不需要,所以我只写一个高级版本(它可能无法编译,但应该让您了解我在做什么)。

public class worker {

private static ExecutorService executor;
static {
final int numberOfThreads = 4;
executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 1000, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());    
}
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
// TODO Auto-generated method stub
System.out.println("starting worker..");
//Connection information goes here
channel.basicQos(50); //this is part of the connection, the queue server only gives 50 messages without acknowledgment

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //gets data from queue
String message = new String(delivery.getBody());  
executor.submit(new DoWork(channel, message, delivery));
}

class DoWork implements Runnable{ //where the main work happens
//setup variables, basically passing queue connection information as well as data here, so I only need to rely on one connection    
public void run() {
new Thread(new Awk_to_Queue(channel, delivery)).start(); //this sends an Awk_to_Queue to the queue, I launch a thread for this so my program can keep working.

if (data is good) {
new Thread(new Send_to_Queue("success_queue", message1, channel)).start();                
continue;
}  else if (Data is not good but not bad either ) {
new Thread(new Send_to_Queue("task_queue", message2, channel)).start();             


class Send_to_Queue implements Runnable{       
public void run() {
//takes data in and sends to queue in the way I used to previous do it, but just does it in a thread. queue connection is passed over so I only need to have one connection
}
}


class Awk_to_Queue implements Runnable{
public void run() {
//awk's queue so queue server can send one more piece of data to queue up
}
}

就是这样。如果它有点难以阅读,我很抱歉(我删除了很多东西只是为了向您展示我正在做的事情的结构)。 fork 线程不影响速度我做错了什么(它没有看到它运行得更快,分析器的结果也没有改变)?是我 fork 线程的方式有问题( new Thread(new Awk_to_Queue(channel, delivery)).start();)还是像我的线程设计一样?

最佳答案

我想到了两件事:

1) 唯一读取 远程队列的线程似乎是在 main() 方法中运行无限循环的主线程。无论您将东西塞入其中的速度有多快,处理它们的速度永远不会快于取出它们的速度。

2) 产生 new Thread(); 是一个“昂贵”的操作。不断地为单个短任务创建新线程只是在内存分配和 native 资源之间搅动。您应该将这些“队列放置”卸载到第二个 ExecutorService,您可以调整其大小,而不是生成无限数量的线程。

关于java - fork 线程并没有加快我的程序,我在我的情况下是否正确使用它?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10182465/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com