gpt4 book ai didi

用于继续流式传输的 java.nio 选择器和 SocketChannel

转载 作者:塔克拉玛干 更新时间:2023-11-03 05:22:29 25 4
gpt4 key购买 nike

我目前正在将 java.nio.channel.Selectors & SocketChannels 用于将打开一对多连接以继续流式传输到服务器的应用程序。我的应用程序有三个线程:StreamWriteWorker - 对 SocketChannel 执行写入操作,StreamReadWorker - 从缓冲区读取字节并解析内容,StreamTaskDispatcher - 执行 Selector 对 readyOps 的选择并为工作线程分派(dispatch)新的可运行对象。

问题 - 对选择器的选择方法的调用在第一次调用时仅返回一个 > 0 的值(有效的 readyOps);我能够一次性在所有就绪 channel 上执行写入和发送数据,但以下所有选择器选择方法的调用都返回 0。

问题:我是否需要在每次读/写后调用关闭 SocketChannel(我希望不需要!)?如果不是,是什么原因导致 SocketChannels 不可用于任何读/写操作?

很抱歉我不能发布代码,但我希望我已经足够清楚地解释了问题以便有人提供帮助。我已经搜索了答案,我看到您无法在关闭后重用 SocketChannel 连接,但我的 channel 不应该关闭,服务器永远不会收到 EOF 流结果。

我取得了一些进展,发现由于 json 解析错误,写操作没有在服务器应用程序上发生。因此,现在客户端应用程序代码上的 SocketChannel 在处理读取操作后准备好进行另一次写入操作。我猜这是 SocketChannels 的 TCP 特性。但是,SocketChannel 不能用于服务器应用程序端的另一个读取操作。这是 SocketChannels 的正常行为吗?读取操作后是否需要在客户端关闭连接并建立新连接?

这是我正在尝试做的代码示例:

package org.stream.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.lang3.RandomStringUtils;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.google.gson.stream.JsonToken;

public class ClientServerTest {

private LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>();
private ExecutorService executor = Executors.newFixedThreadPool(1);
private HashMap<String, Integer> uuidToSize = new HashMap<String, Integer>();

private class StreamWriteTask implements Runnable {
private ByteBuffer buffer;
private SelectionKey key;
private Selector selector;

private StreamWriteTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
this.buffer = buffer;
this.key = key;
this.selector = selector;
}

@Override
public void run() {
SocketChannel sc = (SocketChannel) key.channel();
byte[] data = (byte[]) key.attachment();
buffer.clear();
buffer.put(data);
buffer.flip();
int results = 0;
while (buffer.hasRemaining()) {
try {
results = sc.write(buffer);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

if (results == 0) {
buffer.compact();
buffer.flip();
data = new byte[buffer.remaining()];
buffer.get(data);
key.interestOps(SelectionKey.OP_WRITE);
key.attach(data);
selector.wakeup();
return;
}
}

key.interestOps(SelectionKey.OP_READ);
key.attach(null);
selector.wakeup();
}

}

private class StreamReadTask implements Runnable {
private ByteBuffer buffer;
private SelectionKey key;
private Selector selector;

private StreamReadTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
this.buffer = buffer;
this.key = key;
this.selector = selector;
}

private boolean checkUUID(byte[] data) {
return uuidToSize.containsKey(new String(data));
}

@Override
public void run() {
SocketChannel sc = (SocketChannel) key.channel();
buffer.clear();
byte[] data = (byte[]) key.attachment();
if (data != null) {
buffer.put(data);
}
int count = 0;
int readAttempts = 0;
try {
while ((count = sc.read(buffer)) > 0) {
readAttempts++;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

if (count == 0) {
buffer.flip();
data = new byte[buffer.limit()];
buffer.get(data);
if (checkUUID(data)) {
key.interestOps(SelectionKey.OP_READ);
key.attach(data);
} else {
System.out.println("Clinet Read - uuid ~~~~ " + new String(data));
key.interestOps(SelectionKey.OP_WRITE);
key.attach(null);
}
}

if (count == -1) {
try {
sc.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

selector.wakeup();
}

}

private class ClientWorker implements Runnable {

@Override
public void run() {
try {
Selector selector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress("127.0.0.1", 9001));
sc.register(selector, SelectionKey.OP_CONNECT);
ByteBuffer buffer = ByteBuffer.allocateDirect(65535);

while (selector.isOpen()) {
int count = selector.select(10);

if (count == 0) {
continue;
}

Iterator<SelectionKey> it = selector.selectedKeys().iterator();

while (it.hasNext()) {
final SelectionKey key = it.next();
it.remove();
if (!key.isValid()) {
continue;
}

if (key.isConnectable()) {
sc = (SocketChannel) key.channel();
if (!sc.finishConnect()) {
continue;
}
sc.register(selector, SelectionKey.OP_WRITE);
}

if (key.isReadable()) {
key.interestOps(0);
executor.execute(new StreamReadTask(buffer, key, selector));
}
if (key.isWritable()) {
key.interestOps(0);
if(key.attachment() == null){
key.attach(dataQueue.take());
}
executor.execute(new StreamWriteTask(buffer, key, selector));
}
}
}
} catch (IOException ex) {
// Handle Exception
}catch(InterruptedException ex){

}

}
}

private class ServerWorker implements Runnable {
@Override
public void run() {
try {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ServerSocket socket = ssc.socket();
socket.bind(new InetSocketAddress(9001));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocateDirect(65535);
DataHandler handler = new DataHandler();

while (selector.isOpen()) {
int count = selector.select(10);

if (count == 0) {
continue;
}

Iterator<SelectionKey> it = selector.selectedKeys().iterator();

while (it.hasNext()) {
final SelectionKey key = it.next();
it.remove();
if (!key.isValid()) {
continue;
}

if (key.isAcceptable()) {
ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
handler.readSocket(buffer, key);
}
if (key.isWritable()) {
handler.writeToSocket(buffer, key);
}
}
}

} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

private class DataHandler {

private JsonObject parseData(StringBuilder builder) {
if (!builder.toString().endsWith("}")) {
return null;
}

JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(builder.toString());
return obj;
}

private void readSocket(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
buffer.clear();
int count = Integer.MAX_VALUE;
int readAttempts = 0;
try {
while ((count = sc.read(buffer)) > 0) {
readAttempts++;
}
} catch (IOException e) {
e.printStackTrace();
}

if (count == 0) {
buffer.flip();
StringBuilder builder = key.attachment() instanceof StringBuilder ? (StringBuilder) key
.attachment() : new StringBuilder();
Charset charset = Charset.forName("UTF-8");
CharsetDecoder decoder = charset.newDecoder();
decoder.onMalformedInput(CodingErrorAction.IGNORE);
System.out.println(buffer);
CharBuffer charBuffer = decoder.decode(buffer);
String content = charBuffer.toString();
charBuffer = null;
builder.append(content);
System.out.println(content);
JsonObject obj = parseData(builder);
if (obj == null) {
key.attach(builder);
key.interestOps(SelectionKey.OP_READ);
} else {
System.out.println("data ~~~~~~~ " + builder.toString());
JsonPrimitive uuid = obj.get("uuid").getAsJsonPrimitive();
key.attach(uuid.toString().getBytes());
key.interestOps(SelectionKey.OP_WRITE);
}
}

if (count == -1) {
key.attach(null);
sc.close();
}
}

private void writeToSocket(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
byte[] data = (byte[]) key.attachment();
buffer.clear();
buffer.put(data);
buffer.flip();
int writeAttempts = 0;
while (buffer.hasRemaining()) {
int results = sc.write(buffer);
writeAttempts++;
System.out.println("Write Attempt #" + writeAttempts);
if (results == 0) {
buffer.compact();
buffer.flip();
data = new byte[buffer.remaining()];
buffer.get(data);
key.attach(data);
key.interestOps(SelectionKey.OP_WRITE);
break;
}
}

key.interestOps(SelectionKey.OP_READ);
key.attach(null);
}
}

public ClientServerTest() {
for (int index = 0; index < 1000; index++) {
JsonObject obj = new JsonObject();
String uuid = UUID.randomUUID().toString();
uuidToSize.put(uuid, uuid.length());
obj.addProperty("uuid", uuid);
String data = RandomStringUtils.randomAlphanumeric(10000);
obj.addProperty("event", data);
dataQueue.add(obj.toString().getBytes());
}

Thread serverWorker = new Thread(new ServerWorker());
serverWorker.start();

Thread clientWorker = new Thread(new ClientWorker());
clientWorker.start();

}

/**
* @param args
*/
public static void main(String[] args) {
ClientServerTest test = new ClientServerTest();
for(;;){

}
}

}

最佳答案

  1. 处理 OP_CONNECT 的正确方法是尝试一次 finishConnect(),如果成功,注销 OP_CONNECT 并注册 OP_READOP_WRITE,可能是后者,因为您是客户。在非阻塞模式下循环和 hibernate 没有意义。如果 finishConnect() 返回 false,OP_CONNECT 将再次触发。

  2. 您对 !key.isAcceptable()!key.isReadable()!key.isWriteable() 的处理> 绝对没有任何意义。如果 key 是可接受的,则调用 accept()。如果可读,调用 read()。如果它是可写的,调用 write()。就这么简单。

  3. 您需要注意 channel 几乎总是可写的,除了套接字发送缓冲区已满的短暂时间。因此,只有在您有要写的东西时才注册 OP_WRITE,或者更好的是您已经尝试写入并获得零返回之后;然后当 OP_WRITE 触发时,重试写入并取消注册 OP_WRITE 除非你得到另一个零。

  4. 您使用 ByteBuffer 太省钱了。实际上,您每个 channel 都需要一个。您可以将其保存为 key 附件,以便在需要时取回。否则,您将无法累积肯定会发生的部分读取,也无法以任何方式重试写入。

关于用于继续流式传输的 java.nio 选择器和 SocketChannel,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10716057/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com