gpt4 book ai didi

java - 具有消费者-生产者设计的 TCP 套接字服务器 'cpu time limit exceeded'

转载 作者:行者123 更新时间:2023-11-30 06:46:41 24 4
gpt4 key购买 nike

运行使用消费者/生产者设计创建的套接字服务器时出现此问题,程序崩溃并在日志中显示错误 cpu time limit exceeded。我还发现当时 cpu 使用率超过 90%。这是服务器的代码,它可能出了什么问题,我该如何优化它?

我使用这种队列方法来避免为每个请求创建如此多的线程

在main方法(主线程)中

//holds socket instances
ConcurrentLinkedQueue<Socket> queue = new ConcurrentLinkedQueue<>();

//create producer thread
Thread producer = new Thread(new RequestProducer(queue));
//create consumer thread
Thread consumer = new Thread(new RequestConsumer(queue));

producer.start();
consumer.start();

RequestProducer线程

//this holds queue instance coming from main thread
ConcurrentLinkedQueue<Socket> queue

//constructor, initiate queue
public RequestProducer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}

public void run() {
try {
//create serversocket instance on port 19029
ServerSocket serverSocket = new ServerSocket(19029);
while (true) {
try {
//keep accept connections
Socket socket = serverSocket.accept();
//add socket to queue
queue.offer(socket);
} catch (ConnectException ce) {//handle exception
} catch (SocketException e) {//handle exception
}
}
} catch (IOException ex) {//handle exception}
}

RequestConsumer线程

//this holds queue instance coming from main thread, same as requestproducer
ConcurrentLinkedQueue<Socket> queue

//constructor, initiate queue
public RequestConsumer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}

public void run() {
try {
Socket socket = null;
while (true) {
//get head of the queue (socket instance)
socket = queue.poll();
if (null != socket) {
//process data stream
String in = DataStreamUtil.parseAsciiSockStream(socket.getInputStream());
//close socket conection
socket.close();
//excecute database insert of processed data
excecuteDbInsert(in);
}
}
} catch (IOException | ParseException ex) {//handle exceptions}
}

数据流解析器

public static String parseAsciiSockStream(InputStream in) throws IOException {
StringBuilder builder = new StringBuilder();
if (null != in) {
byte[] b = new byte[BYTE_STREAM_MAX];
int length = in.read(b);
for (int i = 0; i < length; i++) {
builder.append((char) (int) b[i]);
}
in.close();
}
return builder.toString();
}

最佳答案

由于积极的 while(true) 循环进入您的消费者,CPU 时间限制已超出。以下是如何解决问题的示例。

您可以在消费者的 while 循环中添加简单的 Thread.sleep(1) 或使用等待/通知模式来限制 CPU 消耗。

RequestProducer线程

import java.io.IOException;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.ConcurrentLinkedQueue;

public class RequestProducer implements Runnable {
//this holds queue instance coming from main thread
final ConcurrentLinkedQueue<Socket> queue;

//constructor, initiate queue
public RequestProducer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}

public void run() {
try {
//create serversocket instance on port 19029
ServerSocket serverSocket = new ServerSocket(19029);
while (true) {
try {
//keep accept connections
Socket socket = serverSocket.accept();
//add socket to queue
queue.offer(socket);
synchronized (queue) {
System.out.println("notifying");
queue.notify();
}
} catch (ConnectException ce) {//handle exception
} catch (SocketException e) {//handle exception
}
}
} catch (IOException ex) {//handle exception}
}

}
}

RequestConsumer线程

import java.io.IOException;
import java.net.Socket;
import java.text.ParseException;
import java.util.concurrent.ConcurrentLinkedQueue;

public class RequestConsumer implements Runnable {
//this holds queue instance coming from main thread, same as requestproducer
final ConcurrentLinkedQueue<Socket> queue;

//constructor, initiate queue
public RequestConsumer(
ConcurrentLinkedQueue<Socket> queue
) {
this.queue = queue;
}

public void run() {
try {
Socket socket = null;
while (true) {
//get head of the queue (socket instance)
System.out.println("Waiting for new socket");
synchronized (queue) {
queue.wait();
}
System.out.println("Acquired new socket");

socket = queue.poll();
try {
if (null != socket) {
//process data stream
String in = DataStreamUtil.parseAsciiSockStream(socket.getInputStream());
//close socket conection
socket.close();
//excecute database insert of processed data
//excecuteDbInsert(in);

System.out.println(in);
}
} finally {
if (socket != null) {
socket.close();
}
}

}
} catch (IOException ex) {//handle exceptions}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}

}

数据流解析器

import java.io.IOException;
import java.io.InputStream;

public class DataStreamUtil {
public static String parseAsciiSockStream(InputStream in) throws IOException {
StringBuilder builder = new StringBuilder();
if (null != in) {
byte[] b = new byte[BYTE_STREAM_MAX];
System.out.println("Waiting for input");
int length = in.read(b);
System.out.println("Got input");
for (int i = 0; i < length; i++) {
builder.append((char) (int) b[i]);
}
in.close();
}
return builder.toString();
}
}

关于java - 具有消费者-生产者设计的 TCP 套接字服务器 'cpu time limit exceeded',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47235963/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com