gpt4 book ai didi

Java 网络 : evented Socket/InputStream

转载 作者:塔克拉玛干 更新时间:2023-11-03 05:07:27 25 4
gpt4 key购买 nike

我正在 Java 的套接字上实现一个面向事件的层,我想知道是否有一种方法可以确定是否有待读取的数据。

我通常的方法是从套接字读取到缓冲区,并在缓冲区填充给定字节数时调用提供的回调(如果每次到达时都需要触发回调,则可以为 0 ),但我怀疑 Java 已经在为我做缓冲。

InputStream 的 available() 方法是否可靠?我应该只 read() 并在 Socket 之上做我自己的缓冲吗?还是有别的办法?

最佳答案

简而言之,不。 available() 不可靠(至少不适合我)。我推荐使用 java.nio.channels.SocketChannel 连接 SelectorSelectionKey。这个解决方案有点基于事件,但比普通套接字更复杂。

对于客户:

  1. 构造套接字 channel (socket),打开一个选择器(selector = Selector.open();)。
  2. 使用非阻塞socket.configureBlocking(false);
  3. 为连接注册选择器socket.register(selector, SelectionKey.OP_CONNECT);
  4. 连接 socket.connect(new InetSocketAddress(host, port));
  5. 看看有没有新的selector.select();
  6. 如果“new”是指连接成功,则为OP_READ注册选择器;如果"new"指的是可用数据,则直接从套接字中读取。

但是,为了使其异步,您需要设置一个单独的线程(尽管套接字被创建为非阻塞,线程无论如何都会阻塞)来检查是否有东西到达。

对于服务器,有 ServerSocketChannel,您可以为它使用 OP_ACCEPT

供引用,这是我的代码(客户端),应该给你一个提示:

 private Thread readingThread = new ListeningThread();

/**
* Listening thread - reads messages in a separate thread so the application does not get blocked.
*/
private class ListeningThread extends Thread {
public void run() {
running = true;
try {
while(!close) listen();
messenger.close();
}
catch(ConnectException ce) {
doNotifyConnectionFailed(ce);
}
catch(Exception e) {
// e.printStackTrace();
messenger.close();
}
running = false;
}
}

/**
* Connects to host and port.
* @param host Host to connect to.
* @param port Port of the host machine to connect to.
*/
public void connect(String host, int port) {
try {
SocketChannel socket = SocketChannel.open();
socket.configureBlocking(false);
socket.register(this.selector, SelectionKey.OP_CONNECT);
socket.connect(new InetSocketAddress(host, port));
}
catch(IOException e) {
this.doNotifyConnectionFailed(e);
}
}

/**
* Waits for an event to happen, processes it and then returns.
* @throws IOException when something goes wrong.
*/
protected void listen() throws IOException {
// see if there are any new things going on
this.selector.select();
// process events
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
// check validity
if(key.isValid()) {
// if connectable...
if(key.isConnectable()) {
// ...establish connection, make messenger, and notify everyone
SocketChannel client = (SocketChannel)key.channel();
// now this is tricky, registering for OP_READ earlier causes the selector not to wait for incoming bytes, which results in 100% cpu usage very, very fast
if(client!=null && client.finishConnect()) {
client.register(this.selector, SelectionKey.OP_READ);
}
}
// if readable, tell messenger to read bytes
else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) {
// read message here
}
}
}
}

/**
* Starts the client.
*/
public void start() {
// start a reading thread
if(!this.running) {
this.readingThread = new ListeningThread();
this.readingThread.start();
}
}

/**
* Tells the client to close at nearest possible moment.
*/
public void close() {
this.close = true;
}

对于服务器:

 /**
* Constructs a server.
* @param port Port to listen to.
* @param protocol Protocol of messages.
* @throws IOException when something goes wrong.
*/
public ChannelMessageServer(int port) throws IOException {
this.server = ServerSocketChannel.open();
this.server.configureBlocking(false);
this.server.socket().bind(new InetSocketAddress(port));
this.server.register(this.selector, SelectionKey.OP_ACCEPT);
}

/**
* Waits for event, then exits.
* @throws IOException when something goes wrong.
*/
protected void listen() throws IOException {
// see if there are any new things going on
this.selector.select();
// process events
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()) {
SelectionKey key = iter.next();
// do something with the connected socket
iter.remove();
if(key.isValid()) this.process(key);
}
}

/**
* Processes a selection key.
* @param key SelectionKey.
* @throws IOException when something is wrong.
*/
protected void process(SelectionKey key) throws IOException {
// if incoming connection
if(key.isAcceptable()) {
// get client
SocketChannel client = (((ServerSocketChannel)key.channel()).accept());
try {
client.configureBlocking(false);
client.register(this.selector, SelectionKey.OP_READ);
}
catch(Exception e) {
// catch
}
}
// if readable, tell messenger to read
else if(key.isReadable()) {
// read
}
}

希望这对您有所帮助。

关于Java 网络 : evented Socket/InputStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5969827/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com