gpt4 book ai didi

java - JAVA中从服务器套接字缓冲区接收时消除半字符串

转载 作者:行者123 更新时间:2023-12-02 12:26:09 25 4
gpt4 key购买 nike

我在一个程序中编写了一个简单的NIO服务器和内部客户端(在同一个程序内),以便服务器从外部接收数据,内部客户端将服务器接收到的数据发送到外部服务器。我使用 While() 循环在两个并行线程中连续运行这两个进程。现在的问题是,我将以非常高的速度接收数据到内部服务器,并且我收到的所有内容,我将使用内部客户端将它们发送到外部服务器。有时,从缓冲区检索数据会导致总字符串大小的一半。这意味着我收到“HELLO”,但总字符串是“HELLO SERVER”。这只是一个例子。我会收到很长的字符串。同样,通过内部客户端将数据发送到外部服务器后,我将监听数据,并且收到相同的半字符串。有什么方法可以消除这些半字符串并获得全长字符串。我必须毫无失败地获取完整的字符串。

我正在使用 while 循环来处理该过程。这使得 CPU 利用率高达 50%。有什么方法可以在不使用 Thread.sleep 方法的情况下降低 CPU 利用率?因为我需要不断听取外界的数据。他们可能会为一个请求发送 2-4 个字符串。我尝试使用执行程序服务线程连续运行进程,但它需要一些 sleep 。如果我包含一些 sleep ,我将无法获取字符串,如果我不包含 sleep ,我的 CPU 利用率会非常高(50-60%)。谁能帮我解决这两个问题吗?

这是我的代码:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {
static SocketChannel channel;
public static void main(String[] args) throws IOException {

System.out.println("Listening for connections on : 8888"); //8888
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(8888));
channel = serverChannel.accept();
System.out.println("Connected...");
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.configureBlocking(false);
ReceiveFromOMS receivefromOMS;
SendToExchange sendExchange;
receivefromOMS = new ReceiveFromOMS();
sendExchange = new SendToExchange();

Thread t1 = new Thread(receivefromOMS);
Thread t2 = new Thread(sendExchange);
t1.start();
t2.start();
}
}

class ReceiveFromOMS extends Thread{
public static SocketChannel channel;
static ByteBuffer buffer = ByteBuffer.allocate(1024);
static ServerSocketChannel serverChannel ;
public static int ReceiveFromOMSPort;
BlockingQueue<String> fromOMSqueue = new LinkedBlockingQueue<>(30);

@Override
public void run(){
while(true){
try {
receiveFromOMS();
} catch (InterruptedException ex) {
System.err.println(ex.getMessage());
try {
Thread.sleep(5000);
} catch (InterruptedException ex1) { }
}
}
}

public void receiveFromOMS() throws InterruptedException{
try {
int numRead = -1;
numRead = channel.read(buffer);
while(numRead==0){
numRead = channel.read(buffer);
}
if (numRead == -1) {
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
System.out.println("Connection closed by client: " + remoteAddr);
channel.close();
return;
}
byte[] data = new byte[numRead];
System.arraycopy(buffer.array(), 0, data, 0, numRead);
fromOMSqueue.add(new String(data));
String msg = fromOMSqueue.poll();
System.out.println("OutGoing To Exchange>> " + msg);
SendToExchange.sendToEchange(msg);
buffer.flip();
buffer.clear();

} catch (IOException ex) {
System.err.println(ex.getMessage());
Thread.sleep(5000);
}
}
}


class SendToExchange extends Thread{
static SocketChannel channel;
static ByteBuffer bb = ByteBuffer.allocateDirect(1024);
static Charset charset = Charset.forName("UTF-8");
public byte[] data;
public static String message;

@Override
public void run(){
try {
while(true){
receive();
Thread.sleep(100);
}
} catch (IOException | InterruptedException ex) {
System.err.println(ex.getMessage());
try {
Thread.sleep(5000);
} catch (InterruptedException ex1) {}
}
}

public static void sendToEchange(String msg){
try {
bb = stringToByteBuffer(msg, charset);
channel.write(bb);
} catch (IOException ex) {
System.err.println(ex.getMessage());
}
}
public void receive() throws IOException {

ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

int numRead = -1;
numRead = channel.read(buffer);
while (numRead == 0) {
numRead = channel.read(buffer);
}

if (numRead == -1) {
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
System.out.println("Connection closed by Exchange: " + remoteAddr);
channel.close();
return;
}
buffer.flip();
data = new byte[numRead];
buffer.get(data);
message = new String(data);
System.out.println("Incoming from Exchange>> " + message);
buffer.clear();

}
public static ByteBuffer stringToByteBuffer(String msg, Charset charset){
return ByteBuffer.wrap(msg.getBytes(charset));
}
}

最佳答案

假设您的服务器正在向每个字符串附加一个消息结束标记字符串,例如"<EOM>" ,然后对代码进行以下修改(将其视为草图,因为我没有完全验证它)可用于等待完整的字符串:

String end = "<EOM>";
StringBuilder curStr = new StringBuilder();

int numRead = 0;
while(-1 != (numRead = channel.read(buffer))){
curStr.append(new String(buffer.array(), 0, numRead, Charset.forName("UTF-8")));
int endIdx = curStr.indexOf(end);
if (endIdx != -1) {
fromOMSqueue.add(curStr.substring(0, endIdx + end.length()));
break;
}
}

if (numRead == -1) {
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
System.out.println("Connection closed by client: " + remoteAddr);
channel.close();
return;
}

String msg = fromOMSqueue.poll();

关于java - JAVA中从服务器套接字缓冲区接收时消除半字符串,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45484567/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com