gpt4 book ai didi

Java 选择器在接受连接后卡在 select() 上

转载 作者:行者123 更新时间:2023-11-30 04:53:03 25 4
gpt4 key购买 nike

我目前正在做一项家庭作业,我必须创建一个使用 Java 选择器接受连接并通过套接字 channel 进行通信的服务器。客户端使用自己的套接字 channel 连接到服务器,服务器接受连接,将新的套接字 channel 设置为 OP_READ 并等待读取信息。下面是服务器的run方法以及服务器的accept和read方法。

//TODO: Figure out why the data is being read over and over again
public void run()
{

while(true)
{
try{


// Process potential change requests
processChangeRequests();
//select all the things! returns the amount of channels that are ready

//PPP: Select is stuck after accepted socket is changed to OP_READ -- data is not read
this.selector.select();

//DEBUG
System.out.println("Selector Selected Something!");

//get the set of keys of the channels that are ready
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();

while(keyIterator.hasNext()) {

//get the key itself
SelectionKey key = keyIterator.next();
keyIterator.remove();

System.out.println("key: "+key.interestOps());

if(!key.isValid())
{
continue;
}

//conditions
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
this.accept(key);
}
else if (key.isReadable()) {
// a channel is ready for reading
this.readData(key);
}
else if (key.isWritable()) {
// a channel is ready for writing

//this.fPool.submitTask(new MessageDigestProcessor(key,this.buffer.array()));
}


}
//fPool.submitTask(new MessageDigestProcessor(socket));
}catch (Exception e) { e.printStackTrace(); }
}
}

private void accept(SelectionKey key) throws IOException
{
System.out.println("Accepted Connection!");
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
SocketChannel clientSocketChannel = serverSocketChannel.accept();
clientSocketChannel.configureBlocking(false);
clientSocketChannel.register(this.selector, SelectionKey.OP_READ);
}

/**
* Reads a message digest from the input stream from the socket
* @return the message digest read from the input stream or "" if there was a problem reading
* TODO read until there is no more data to be read?
* TODO do not close the socket channels unless there is an exception from reading
*/
private void readData(SelectionKey key) throws IOException
{
SocketChannel clientSocketChannel = (SocketChannel) key.channel();
//DEBUG
System.out.println("Message received from: "+clientSocketChannel.socket().getInetAddress());
//clear the buffer before reading
this.buffer.clear();
int numRead;
try
{
numRead = clientSocketChannel.read(this.buffer);
//DEBUG
System.out.println("num read: "+numRead);

} catch (IOException e)
{
key.cancel();
clientSocketChannel.close();
return;
}

if(numRead==-1)
{
key.cancel();
key.channel().close();
return;
}

//DEBUG
try {
System.out.println(Utility.SHA1FromBytes(this.buffer.array()));
} catch (NoSuchAlgorithmException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

//time for writing!
key.interestOps(SelectionKey.OP_WRITE);

}

在我使用客户端连接到服务器后,接受工作正常,但在客户端写入数据后,服务器在 this.selector.select() 上被阻止,因此它永远不会调用 readData()。我有什么遗漏的吗?我通过 Eclipse 中的调试器跟踪代码,然后就停止了。

编辑:这是客户端代码

package cs455.scaling;

import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public class Client implements Runnable {

//stores all the hash codes for all messages that are sent to the server
private final LinkedList<String> hashCodes = new LinkedList<String>();
private final long messageRate;
private final ByteBuffer buffer = ByteBuffer.allocate(8192);
private final Selector selector;
private final InetSocketAddress serverSocketAddress;//maintain an open connection at all times, bro. Why should I shut it?
private final List<ChangeRequest> changeRequests = new LinkedList<ChangeRequest>();
//private final Thread senderThread;


public Client(String ipAddress, int port, long messageRate) throws UnknownHostException, IOException
{
this.serverSocketAddress = new InetSocketAddress(InetAddress.getByName(ipAddress),port);
this.messageRate = messageRate;
this.selector = Selector.open();
// senderThread = new Thread(new MessageDigestSender(this));
// senderThread.start();
}

public long getMessageRate()
{
return messageRate;
}

/**
* Generates a message digest and sends it over the connected socket
* TODO open new connection each time a send needs to occur?
* @throws IOException
*/
public void sendMessageDigest() throws IOException
{
initConnection();
//generate the message
byte [] data = generateMessageDigest();
//prepare the data
buffer.clear();
buffer.put(data);

this.selector.wakeup();
}

/**
* Does the actual writing of the message
* @param SelectionKey key that represents the channel that is being written to
*/
private void write(SelectionKey key) throws IOException
{
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.write(buffer);
System.out.println("Wrote data to Server...");
key.interestOps(SelectionKey.OP_READ);//not interested in writing for right now -- wait for message from the server
}

public void run()
{
while(true)
{
try{

//process the socket channel op changes
processChangeRequests();

this.selector.select();

//get the set of keys of the channels that are ready
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();

while(keyIterator.hasNext()) {

//get the key itself
SelectionKey key = keyIterator.next();
keyIterator.remove();

//System.out.println(key.interestOps());

//avoid invalid keys
if(!key.isValid())
{
continue;
}
if (key.isConnectable()) {
this.finishConnection(key);
} else if (key.isReadable()) {
this.readData(key);
} else if (key.isWritable()) {
this.write(key);
}
}
} catch (Exception e) { e.printStackTrace(); }
}
}

/**
* Method that queues up changes that need to be made to selection keys before they are processed
* this is useful for when multiple threads need to make changes to selection keys
* XXX: Used when the caller is NOT the selecting thread
*/
private void processChangeRequests() throws IOException
{
synchronized(this.changeRequests)
{
Iterator<ChangeRequest> changes = this.changeRequests.iterator();
while(changes.hasNext())
{
ChangeRequest changeRequest = changes.next();
switch(changeRequest.type)
{
case(ChangeRequest.CHANGE_OP):
changeRequest.channel.keyFor(this.selector).interestOps(changeRequest.ops);
break;
case ChangeRequest.REGISTER:
changeRequest.channel.register(this.selector, changeRequest.ops);
break;
}
}
this.changeRequests.clear();
}
}


/**
* Initialize the socket channel on the specified ip address and port
* configure it for non-blocking
* @param ipAddress
* @param port
* @throws IOException
*/
private void initConnection() throws IOException
{
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(this.serverSocketAddress);

//a request is made because the selecting thread is not the caller of this function
synchronized(this.changeRequests){
this.changeRequests.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER,SelectionKey.OP_CONNECT));
}
}

/**
* Finish the connection by calling finishConnect() on the channel and then setting it to OP_WRITE
* @param key
*/
private void finishConnection(SelectionKey key)
{
// a connection was established with a remote server.
SocketChannel socketChannel = (SocketChannel) key.channel();

// Finish the connection. If the connection operation failed
// this will raise an IOException.
try {
socketChannel.finishConnect();
System.out.println("Finished connecting to server");
} catch (IOException e) {
// Cancel the channel's registration with our selector
e.printStackTrace();
key.cancel();
return;
}

// Register an interest in writing on this channel
key.interestOps(SelectionKey.OP_WRITE);
}

/**
* Serves as a wrapper around the SHA1FromBytes method
* It generates a byte array using the util.Random class and then generates a message digest from those bytes
* If the algorithm doesn't exist then a blank string is returned
*/
private byte [] generateMessageDigest()
{
Random random = new Random();
byte [] data = new byte[8192];
random.nextBytes(data);
String digest = "";
try {
digest = Utility.SHA1FromBytes(data);
//add it to the hashCodes linkedList
hashCodes.add(digest);
System.out.println("Generated Digest: "+digest);
} catch (NoSuchAlgorithmException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return data;

}

/**
* Reads a message digest from the input stream from the socket
* @return the message digest read from the input stream or "" if there was a problem reading
* TODO read until there is no more data to be read?
* TODO do not close the socket channels unless there is an exception from reading
*/
private void readData(SelectionKey key) throws IOException
{
SocketChannel clientSocketChannel = (SocketChannel) key.channel();
//DEBUG
System.out.println("Message received from: "+clientSocketChannel.socket().getInetAddress());
//clear the buffer before reading
this.buffer.clear();
int numRead;
try
{
numRead = clientSocketChannel.read(this.buffer);
//DEBUG
System.out.println("num read: "+numRead);

} catch (IOException e)
{
//problem reading from a socket channel
e.printStackTrace();
key.cancel();
clientSocketChannel.close();
return;
}

this.buffer.flip();
System.out.println("Received Message Digest: "+new String(this.buffer.array()));

//done! open business later on!
key.channel().close();
key.channel().keyFor(this.selector).cancel();


}


public static void main(String [] args)
{
String serverHost = "";
int serverPort = 0;
long messageRate = 0;

if(args.length<3)
{
System.err.println("Format: java cs455.scaling.Client [server-host] [server-port] [message-rate]");
System.exit(0);
}

//parse the arguments out
try{

serverHost = args[0];
serverPort = Integer.parseInt(args[1]);
messageRate = Long.parseLong(args[2]);

} catch(NumberFormatException ne) {
ne.printStackTrace();
} catch (ArrayIndexOutOfBoundsException ae)
{
ae.printStackTrace();
}

//start the client
try
{
Client client = new Client(serverHost,serverPort,messageRate);
Thread t = new Thread(client);
t.start();
client.sendMessageDigest();

} catch(IOException i)
{
i.printStackTrace();
}

}
}

最佳答案

您忽略了 read() 和 write() 的返回值,因此您无法在读取时检测短或零长度写入或 EOS 条件。

关于Java 选择器在接受连接后卡在 select() 上,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9446441/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com