- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章详解Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
本文会从传统的bio到nio再到aio自浅至深介绍,并附上完整的代码讲解.
下面代码中会使用这样一个例子:客户端发送一段算式的字符串到服务器,服务器计算后返回结果到客户端.
代码的所有说明,都直接作为注释,嵌入到代码中,看代码时就能更容易理解,代码中会用到一个计算结果的工具类,见文章代码部分.
相关的基础知识文章推荐: java 并发(多线程) 。
1、bio编程 。
1.1、传统的bio编程 。
网络编程的基本模型是c/s模型,即两个进程间的通信.
服务端提供ip和监听端口,客户端通过连接操作想服务端监听的地址发起连接请求,通过三次握手连接,如果连接成功建立,双方就可以通过套接字进行通信.
传统的同步阻塞模型开发中,serversocket负责绑定ip地址,启动监听端口;socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。 。
简单的描述一下bio的服务端通信模型:采用bio通信模型的服务端,通常由一个独立的acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理没处理完成后,通过输出流返回应答给客户端,线程销毁。即典型的一请求一应答通宵模型.
传统bio通信模型图:
该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系,java中的线程也是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降,随着访问量的继续增大,系统最终就死-掉-了.
同步阻塞式i/o创建的server源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
package
com.anxpp.io.calculator.bio;
import
java.io.ioexception;
import
java.net.serversocket;
import
java.net.socket;
/**
* bio服务端源码
* @author yangtao__anxpp.com
* @version 1.0
*/
public
final
class
servernormal {
//默认的端口号
private
static
int
default_port =
12345
;
//单例的serversocket
private
static
serversocket server;
//根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值
public
static
void
start()
throws
ioexception{
//使用默认值
start(default_port);
}
//这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了
public
synchronized
static
void
start(
int
port)
throws
ioexception{
if
(server !=
null
)
return
;
try
{
//通过构造函数创建serversocket
//如果端口合法且空闲,服务端就监听成功
server =
new
serversocket(port);
system.out.println(
"服务器已启动,端口号:"
+ port);
//通过无线循环监听客户端连接
//如果没有客户端接入,将阻塞在accept操作上。
while
(
true
){
socket socket = server.accept();
//当有新的客户端接入时,会执行下面的代码
//然后创建一个新的线程处理这条socket链路
new
thread(
new
serverhandler(socket)).start();
}
}
finally
{
//一些必要的清理工作
if
(server !=
null
){
system.out.println(
"服务器已关闭。"
);
server.close();
server =
null
;
}
}
}
}
|
客户端消息处理线程serverhandler源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
package
com.anxpp.io.calculator.bio;
import
java.io.bufferedreader;
import
java.io.ioexception;
import
java.io.inputstreamreader;
import
java.io.printwriter;
import
java.net.socket;
import
com.anxpp.io.utils.calculator;
/**
* 客户端线程
* @author yangtao__anxpp.com
* 用于处理一个客户端的socket链路
*/
public
class
serverhandler
implements
runnable{
private
socket socket;
public
serverhandler(socket socket) {
this
.socket = socket;
}
@override
public
void
run() {
bufferedreader in =
null
;
printwriter out =
null
;
try
{
in =
new
bufferedreader(
new
inputstreamreader(socket.getinputstream()));
out =
new
printwriter(socket.getoutputstream(),
true
);
string expression;
string result;
while
(
true
){
//通过bufferedreader读取一行
//如果已经读到输入流尾部,返回null,退出循环
//如果得到非空值,就尝试计算结果并返回
if
((expression = in.readline())==
null
)
break
;
system.out.println(
"服务器收到消息:"
+ expression);
try
{
result = calculator.cal(expression).tostring();
}
catch
(exception e){
result =
"计算错误:"
+ e.getmessage();
}
out.println(result);
}
}
catch
(exception e){
e.printstacktrace();
}
finally
{
//一些必要的清理工作
if
(in !=
null
){
try
{
in.close();
}
catch
(ioexception e) {
e.printstacktrace();
}
in =
null
;
}
if
(out !=
null
){
out.close();
out =
null
;
}
if
(socket !=
null
){
try
{
socket.close();
}
catch
(ioexception e) {
e.printstacktrace();
}
socket =
null
;
}
}
}
}
|
同步阻塞式i/o创建的client源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
package
com.anxpp.io.calculator.bio;
import
java.io.bufferedreader;
import
java.io.ioexception;
import
java.io.inputstreamreader;
import
java.io.printwriter;
import
java.net.socket;
/**
* 阻塞式i/o创建的客户端
* @author yangtao__anxpp.com
* @version 1.0
*/
public
class
client {
//默认的端口号
private
static
int
default_server_port =
12345
;
private
static
string default_server_ip =
"127.0.0.1"
;
public
static
void
send(string expression){
send(default_server_port,expression);
}
public
static
void
send(
int
port,string expression){
system.out.println(
"算术表达式为:"
+ expression);
socket socket =
null
;
bufferedreader in =
null
;
printwriter out =
null
;
try
{
socket =
new
socket(default_server_ip,port);
in =
new
bufferedreader(
new
inputstreamreader(socket.getinputstream()));
out =
new
printwriter(socket.getoutputstream(),
true
);
out.println(expression);
system.out.println(
"___结果为:"
+ in.readline());
}
catch
(exception e){
e.printstacktrace();
}
finally
{
//一下必要的清理工作
if
(in !=
null
){
try
{
in.close();
}
catch
(ioexception e) {
e.printstacktrace();
}
in =
null
;
}
if
(out !=
null
){
out.close();
out =
null
;
}
if
(socket !=
null
){
try
{
socket.close();
}
catch
(ioexception e) {
e.printstacktrace();
}
socket =
null
;
}
}
}
}
|
测试代码,为了方便在控制台看输出结果,放到同一个程序(jvm)中运行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
package
com.anxpp.io.calculator.bio;
import
java.io.ioexception;
import
java.util.random;
/**
* 测试方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public
class
test {
//测试主方法
public
static
void
main(string[] args)
throws
interruptedexception {
//运行服务器
new
thread(
new
runnable() {
@override
public
void
run() {
try
{
serverbetter.start();
}
catch
(ioexception e) {
e.printstacktrace();
}
}
}).start();
//避免客户端先于服务器启动前执行代码
thread.sleep(
100
);
//运行客户端
char
operators[] = {
'+'
,
'-'
,
'*'
,
'/'
};
random random =
new
random(system.currenttimemillis());
new
thread(
new
runnable() {
@suppresswarnings
(
"static-access"
)
@override
public
void
run() {
while
(
true
){
//随机产生算术表达式
string expression = random.nextint(
10
)+
""
+operators[random.nextint(
4
)]+(random.nextint(
10
)+
1
);
client.send(expression);
try
{
thread.currentthread().sleep(random.nextint(
1000
));
}
catch
(interruptedexception e) {
e.printstacktrace();
}
}
}
}).start();
}
}
|
其中一次的运行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
服务器已启动,端口号:
12345
算术表达式为:
4
-
2
服务器收到消息:
4
-
2
___结果为:
2
算术表达式为:
5
-
10
服务器收到消息:
5
-
10
___结果为:-
5
算术表达式为:
0
-
9
服务器收到消息:
0
-
9
___结果为:-
9
算术表达式为:
0
+
6
服务器收到消息:
0
+
6
___结果为:
6
算术表达式为:
1
/
6
服务器收到消息:
1
/
6
___结果为:
0.16666666666666666
...
|
从以上代码,很容易看出,bio主要的问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程来处理这条链路,在需要满足高性能、高并发的场景是没法应用的(大量创建新的线程会严重影响服务器性能,甚至罢工).
1.2、伪异步i/o编程 。
为了改进这种一连接一线程的模型,我们可以使用线程池来管理这些线程(需要了解更多请参考前面提供的文章),实现1个或多个线程处理n个客户端的模型(但是底层还是使用的同步阻塞i/o),通常被称为“伪异步i/o模型“.
伪异步i/o模型图:
实现很简单,我们只需要将新建线程的地方,交给线程池管理即可,只需要改动刚刚的server代码即可:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
package
com.anxpp.io.calculator.bio;
import
java.io.ioexception;
import
java.net.serversocket;
import
java.net.socket;
import
java.util.concurrent.executorservice;
import
java.util.concurrent.executors;
/**
* bio服务端源码__伪异步i/o
* @author yangtao__anxpp.com
* @version 1.0
*/
public
final
class
serverbetter {
//默认的端口号
private
static
int
default_port =
12345
;
//单例的serversocket
private
static
serversocket server;
//线程池 懒汉式的单例
private
static
executorservice executorservice = executors.newfixedthreadpool(
60
);
//根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值
public
static
void
start()
throws
ioexception{
//使用默认值
start(default_port);
}
//这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了
public
synchronized
static
void
start(
int
port)
throws
ioexception{
if
(server !=
null
)
return
;
try
{
//通过构造函数创建serversocket
//如果端口合法且空闲,服务端就监听成功
server =
new
serversocket(port);
system.out.println(
"服务器已启动,端口号:"
+ port);
//通过无线循环监听客户端连接
//如果没有客户端接入,将阻塞在accept操作上。
while
(
true
){
socket socket = server.accept();
//当有新的客户端接入时,会执行下面的代码
//然后创建一个新的线程处理这条socket链路
executorservice.execute(
new
serverhandler(socket));
}
}
finally
{
//一些必要的清理工作
if
(server !=
null
){
system.out.println(
"服务器已关闭。"
);
server.close();
server =
null
;
}
}
}
}
|
测试运行结果是一样的.
我们知道,如果使用cachedthreadpool线程池(不限制线程数量,如果不清楚请参考文首提供的文章),其实除了能自动帮我们管理线程(复用),看起来也就像是1:1的客户端:线程数模型,而使用fixedthreadpool我们就有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了n:m的伪异步i/o模型.
但是,正因为限制了线程数量,如果发生大量并发请求,超过最大数量的线程就只能等待,直到线程池中的有空闲的线程可以被复用。而对socket的输入流就行读取时,会一直阻塞,直到发生:
所以在读取数据较慢时(比如数据量大、网络传输慢等),大量并发的情况下,其他接入的消息,只能一直等待,这就是最大的弊端.
而后面即将介绍的nio,就能解决这个难题.
2、nio 编程 。
jdk 1.4中的java.nio.*包中引入新的java i/o库,其目的是提高速度。实际上,“旧”的i/o包已经使用nio重新实现过,即使我们不显式的使用nio编程,也能从中受益。速度的提高在文件i/o和网络i/o中都可能会发生,但本文只讨论后者.
2.1、简介 。
nio我们一般认为是new i/o(也是官方的叫法),因为它是相对于老的i/o类库新增的(其实在jdk 1.4中就已经被引入了,但这个名词还会继续用很久,即使它们在现在看来已经是“旧”的了,所以也提示我们在命名时,需要好好考虑),做了很大的改变。但民间跟多人称之为non-block i/o,即非阻塞i/o,因为这样叫,更能体现它的特点。而下文中的nio,不是指整个新的i/o库,而是非阻塞i/o.
nio提供了与传统bio模型中的socket和serversocket相对应的socketchannel和serversocketchannel两种不同的套接字通道实现.
新增的着两种通道都支持阻塞和非阻塞两种模式.
阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反.
对于低负载、低并发的应用程序,可以使用同步阻塞i/o来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用nio的非阻塞模式来开发.
下面会先对基础知识进行介绍.
2.2、缓冲区 buffer 。
buffer是一个对象,包含一些要写入或者读出的数据.
在nio库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,也是写入到缓冲区中。任何时候访问nio中的数据,都是通过缓冲区进行操作.
缓冲区实际上是一个数组,并提供了对数据结构化访问以及维护读写位置等信息.
具体的缓存区有这些:bytebuffe、charbuffer、 shortbuffer、intbuffer、longbuffer、floatbuffer、doublebuffer。他们实现了相同的接口:buffer.
2.3、通道 channel 。
我们对数据的读取和写入要通过channel,它就像水管一样,是一个通道。通道不同于流的地方就是通道是双向的,可以用于读、写和同时读写操作.
底层的操作系统的通道一般都是全双工的,所以全双工的channel比流能更好的映射底层操作系统的api.
channel主要分两大类:
后面代码会涉及的serversocketchannel和socketchannel都是selectablechannel的子类.
2.4、多路复用器 selector 。
selector是java nio 编程的基础.
selector提供选择已经就绪的任务的能力:selector会不断轮询注册在其上的channel,如果某个channel上面发生读或者写事件,这个channel就处于就绪状态,会被selector轮询出来,然后通过selectionkey可以获取就绪channel的集合,进行后续的i/o操作.
一个selector可以同时轮询多个channel,因为jdk使用了epoll()代替传统的select实现,所以没有最大连接句柄1024/2048的限制。所以,只需要一个线程负责selector的轮询,就可以接入成千上万的客户端.
2.5、nio服务端 。
代码比传统的socket编程看起来要复杂不少.
直接贴代码吧,以注释的形式给出代码说明.
nio创建的server源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
package
com.anxpp.io.calculator.nio;
public
class
server {
private
static
int
default_port =
12345
;
private
static
serverhandle serverhandle;
public
static
void
start(){
start(default_port);
}
public
static
synchronized
void
start(
int
port){
if
(serverhandle!=
null
)
serverhandle.stop();
serverhandle =
new
serverhandle(port);
new
thread(serverhandle,
"server"
).start();
}
public
static
void
main(string[] args){
start();
}
}
|
serverhandle:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
package
com.anxpp.io.calculator.nio;
import
java.io.ioexception;
import
java.net.inetsocketaddress;
import
java.nio.bytebuffer;
import
java.nio.channels.selectionkey;
import
java.nio.channels.selector;
import
java.nio.channels.serversocketchannel;
import
java.nio.channels.socketchannel;
import
java.util.iterator;
import
java.util.set;
import
com.anxpp.io.utils.calculator;
/**
* nio服务端
* @author yangtao__anxpp.com
* @version 1.0
*/
public
class
serverhandle
implements
runnable{
private
selector selector;
private
serversocketchannel serverchannel;
private
volatile
boolean
started;
/**
* 构造方法
* @param port 指定要监听的端口号
*/
public
serverhandle(
int
port) {
try
{
//创建选择器
selector = selector.open();
//打开监听通道
serverchannel = serversocketchannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
serverchannel.configureblocking(
false
);
//开启非阻塞模式
//绑定端口 backlog设为1024
serverchannel.socket().bind(
new
inetsocketaddress(port),
1024
);
//监听客户端连接请求
serverchannel.register(selector, selectionkey.op_accept);
//标记服务器已开启
started =
true
;
system.out.println(
"服务器已启动,端口号:"
+ port);
}
catch
(ioexception e){
e.printstacktrace();
system.exit(
1
);
}
}
public
void
stop(){
started =
false
;
}
@override
public
void
run() {
//循环遍历selector
while
(started){
try
{
//无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(
1000
);
//阻塞,只有当至少一个注册的事件发生的时候才会继续.
// selector.select();
set<selectionkey> keys = selector.selectedkeys();
iterator<selectionkey> it = keys.iterator();
selectionkey key =
null
;
while
(it.hasnext()){
key = it.next();
it.remove();
try
{
handleinput(key);
}
catch
(exception e){
if
(key !=
null
){
key.cancel();
if
(key.channel() !=
null
){
key.channel().close();
}
}
}
}
}
catch
(throwable t){
t.printstacktrace();
}
}
//selector关闭后会自动释放里面管理的资源
if
(selector !=
null
)
try
{
selector.close();
}
catch
(exception e) {
e.printstacktrace();
}
}
private
void
handleinput(selectionkey key)
throws
ioexception{
if
(key.isvalid()){
//处理新接入的请求消息
if
(key.isacceptable()){
serversocketchannel ssc = (serversocketchannel) key.channel();
//通过serversocketchannel的accept创建socketchannel实例
//完成该操作意味着完成tcp三次握手,tcp物理链路正式建立
socketchannel sc = ssc.accept();
//设置为非阻塞的
sc.configureblocking(
false
);
//注册为读
sc.register(selector, selectionkey.op_read);
}
//读消息
if
(key.isreadable()){
socketchannel sc = (socketchannel) key.channel();
//创建bytebuffer,并开辟一个1m的缓冲区
bytebuffer buffer = bytebuffer.allocate(
1024
);
//读取请求码流,返回读取到的字节数
int
readbytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if
(readbytes>
0
){
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte
[] bytes =
new
byte
[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
string expression =
new
string(bytes,
"utf-8"
);
system.out.println(
"服务器收到消息:"
+ expression);
//处理数据
string result =
null
;
try
{
result = calculator.cal(expression).tostring();
}
catch
(exception e){
result =
"计算错误:"
+ e.getmessage();
}
//发送应答消息
dowrite(sc,result);
}
//没有读取到字节 忽略
// else if(readbytes==0);
//链路已经关闭,释放资源
else
if
(readbytes<
0
){
key.cancel();
sc.close();
}
}
}
}
//异步发送应答消息
private
void
dowrite(socketchannel channel,string response)
throws
ioexception{
//将消息编码为字节数组
byte
[] bytes = response.getbytes();
//根据数组容量创建bytebuffer
bytebuffer writebuffer = bytebuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writebuffer.put(bytes);
//flip操作
writebuffer.flip();
//发送缓冲区的字节数组
channel.write(writebuffer);
//****此处不含处理“写半包”的代码
}
}
|
可以看到,创建nio服务端的主要步骤如下:
因为应答消息的发送,socketchannel也是异步非阻塞的,所以不能保证一次能吧需要发送的数据发送完,此时就会出现写半包的问题。我们需要注册写操作,不断轮询selector将没有发送完的消息发送完毕,然后通过buffer的hasremain()方法判断消息是否发送完成.
2.6、nio客户端 。
还是直接上代码吧,过程也不需要太多解释了,跟服务端代码有点类似.
client:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package
com.anxpp.io.calculator.nio;
public
class
client {
private
static
string default_host =
"127.0.0.1"
;
private
static
int
default_port =
12345
;
private
static
clienthandle clienthandle;
public
static
void
start(){
start(default_host,default_port);
}
public
static
synchronized
void
start(string ip,
int
port){
if
(clienthandle!=
null
)
clienthandle.stop();
clienthandle =
new
clienthandle(ip,port);
new
thread(clienthandle,
"server"
).start();
}
//向服务器发送消息
public
static
boolean
sendmsg(string msg)
throws
exception{
if
(msg.equals(
"q"
))
return
false
;
clienthandle.sendmsg(msg);
return
true
;
}
public
static
void
main(string[] args){
start();
}
}
|
clienthandle:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
package
com.anxpp.io.calculator.nio;
import
java.io.ioexception;
import
java.net.inetsocketaddress;
import
java.nio.bytebuffer;
import
java.nio.channels.selectionkey;
import
java.nio.channels.selector;
import
java.nio.channels.socketchannel;
import
java.util.iterator;
import
java.util.set;
/**
* nio客户端
* @author yangtao__anxpp.com
* @version 1.0
*/
public
class
clienthandle
implements
runnable{
private
string host;
private
int
port;
private
selector selector;
private
socketchannel socketchannel;
private
volatile
boolean
started;
public
clienthandle(string ip,
int
port) {
this
.host = ip;
this
.port = port;
try
{
//创建选择器
selector = selector.open();
//打开监听通道
socketchannel = socketchannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
socketchannel.configureblocking(
false
);
//开启非阻塞模式
started =
true
;
}
catch
(ioexception e){
e.printstacktrace();
system.exit(
1
);
}
}
public
void
stop(){
started =
false
;
}
@override
public
void
run() {
try
{
doconnect();
}
catch
(ioexception e){
e.printstacktrace();
system.exit(
1
);
}
//循环遍历selector
while
(started){
try
{
//无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(
1000
);
//阻塞,只有当至少一个注册的事件发生的时候才会继续.
// selector.select();
set<selectionkey> keys = selector.selectedkeys();
iterator<selectionkey> it = keys.iterator();
selectionkey key =
null
;
while
(it.hasnext()){
key = it.next();
it.remove();
try
{
handleinput(key);
}
catch
(exception e){
if
(key !=
null
){
key.cancel();
if
(key.channel() !=
null
){
key.channel().close();
}
}
}
}
}
catch
(exception e){
e.printstacktrace();
system.exit(
1
);
}
}
//selector关闭后会自动释放里面管理的资源
if
(selector !=
null
)
try
{
selector.close();
}
catch
(exception e) {
e.printstacktrace();
}
}
private
void
handleinput(selectionkey key)
throws
ioexception{
if
(key.isvalid()){
socketchannel sc = (socketchannel) key.channel();
if
(key.isconnectable()){
if
(sc.finishconnect());
else
system.exit(
1
);
}
//读消息
if
(key.isreadable()){
//创建bytebuffer,并开辟一个1m的缓冲区
bytebuffer buffer = bytebuffer.allocate(
1024
);
//读取请求码流,返回读取到的字节数
int
readbytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if
(readbytes>
0
){
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte
[] bytes =
new
byte
[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
string result =
new
string(bytes,
"utf-8"
);
system.out.println(
"客户端收到消息:"
+ result);
}
//没有读取到字节 忽略
// else if(readbytes==0);
//链路已经关闭,释放资源
else
if
(readbytes<
0
){
key.cancel();
sc.close();
}
}
}
}
//异步发送消息
private
void
dowrite(socketchannel channel,string request)
throws
ioexception{
//将消息编码为字节数组
byte
[] bytes = request.getbytes();
//根据数组容量创建bytebuffer
bytebuffer writebuffer = bytebuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writebuffer.put(bytes);
//flip操作
writebuffer.flip();
//发送缓冲区的字节数组
channel.write(writebuffer);
//****此处不含处理“写半包”的代码
}
private
void
doconnect()
throws
ioexception{
if
(socketchannel.connect(
new
inetsocketaddress(host,port)));
else
socketchannel.register(selector, selectionkey.op_connect);
}
public
void
sendmsg(string msg)
throws
exception{
socketchannel.register(selector, selectionkey.op_read);
dowrite(socketchannel, msg);
}
}
|
2.7、演示结果 。
首先运行服务器,顺便也运行一个客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
package
com.anxpp.io.calculator.nio;
import
java.util.scanner;
/**
* 测试方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public
class
test {
//测试主方法
@suppresswarnings
(
"resource"
)
public
static
void
main(string[] args)
throws
exception{
//运行服务器
server.start();
//避免客户端先于服务器启动前执行代码
thread.sleep(
100
);
//运行客户端
client.start();
while
(client.sendmsg(
new
scanner(system.in).nextline()));
}
}
|
我们也可以单独运行客户端,效果都是一样的.
一次测试的结果:
1
2
3
4
5
6
7
|
服务器已启动,端口号:
12345
1
+
2
+
3
+
4
+
5
+
6
服务器收到消息:
1
+
2
+
3
+
4
+
5
+
6
客户端收到消息:
21
1
*
2
/
3
-
4
+
5
*
6
/
7
-
8
服务器收到消息:
1
*
2
/
3
-
4
+
5
*
6
/
7
-
8
客户端收到消息:-
7.0476190476190474
|
运行多个客户端,都是没有问题的.
3、aio编程 。
nio 2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现.
异步的套接字通道时真正的异步非阻塞i/o,对应于unix网络编程中的事件驱动i/o(aio)。他不需要过多的selector对注册的通道进行轮询即可实现异步读写,从而简化了nio的编程模型.
直接上代码吧.
3.1、server端代码 。
server:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package
com.anxpp.io.calculator.aio.server;
/**
* aio服务端
* @author yangtao__anxpp.com
* @version 1.0
*/
public
class
server {
private
static
int
default_port =
12345
;
private
static
asyncserverhandler serverhandle;
public
volatile
static
long
clientcount =
0
;
public
static
void
start(){
start(default_port);
}
public
static
synchronized
void
start(
int
port){
if
(serverhandle!=
null
)
return
;
serverhandle =
new
asyncserverhandler(port);
new
thread(serverhandle,
"server"
).start();
}
public
static
void
main(string[] args){
server.start();
}
}
|
asyncserverhandler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
package
com.anxpp.io.calculator.aio.server;
import
java.io.ioexception;
import
java.net.inetsocketaddress;
import
java.nio.channels.asynchronousserversocketchannel;
import
java.util.concurrent.countdownlatch;
public
class
asyncserverhandler
implements
runnable {
public
countdownlatch latch;
public
asynchronousserversocketchannel channel;
public
asyncserverhandler(
int
port) {
try
{
//创建服务端通道
channel = asynchronousserversocketchannel.open();
//绑定端口
channel.bind(
new
inetsocketaddress(port));
system.out.println(
"服务器已启动,端口号:"
+ port);
}
catch
(ioexception e) {
e.printstacktrace();
}
}
@override
public
void
run() {
//countdownlatch初始化
//它的作用:在完成一组正在执行的操作之前,允许当前的现场一直阻塞
//此处,让现场在此阻塞,防止服务端执行完成后退出
//也可以使用while(true)+sleep
//生成环境就不需要担心这个问题,以为服务端是不会退出的
latch =
new
countdownlatch(
1
);
//用于接收客户端的连接
channel.accept(
this
,
new
accepthandler());
try
{
latch.await();
}
catch
(interruptedexception e) {
e.printstacktrace();
}
}
}
|
accepthandler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package
com.anxpp.io.calculator.aio.server;
import
java.nio.bytebuffer;
import
java.nio.channels.asynchronoussocketchannel;
import
java.nio.channels.completionhandler;
//作为handler接收客户端连接
public
class
accepthandler
implements
completionhandler<asynchronoussocketchannel, asyncserverhandler> {
@override
public
void
completed(asynchronoussocketchannel channel,asyncserverhandler serverhandler) {
//继续接受其他客户端的请求
server.clientcount++;
system.out.println(
"连接的客户端数:"
+ server.clientcount);
serverhandler.channel.accept(serverhandler,
this
);
//创建新的buffer
bytebuffer buffer = bytebuffer.allocate(
1024
);
//异步读 第三个参数为接收消息回调的业务handler
channel.read(buffer, buffer,
new
readhandler(channel));
}
@override
public
void
failed(throwable exc, asyncserverhandler serverhandler) {
exc.printstacktrace();
serverhandler.latch.countdown();
}
}
|
readhandler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
package
com.anxpp.io.calculator.aio.server;
import
java.io.ioexception;
import
java.io.unsupportedencodingexception;
import
java.nio.bytebuffer;
import
java.nio.channels.asynchronoussocketchannel;
import
java.nio.channels.completionhandler;
import
com.anxpp.io.utils.calculator;
public
class
readhandler
implements
completionhandler<integer, bytebuffer> {
//用于读取半包消息和发送应答
private
asynchronoussocketchannel channel;
public
readhandler(asynchronoussocketchannel channel) {
this
.channel = channel;
}
//读取到消息后的处理
@override
public
void
completed(integer result, bytebuffer attachment) {
//flip操作
attachment.flip();
//根据
byte
[] message =
new
byte
[attachment.remaining()];
attachment.get(message);
try
{
string expression =
new
string(message,
"utf-8"
);
system.out.println(
"服务器收到消息: "
+ expression);
string calrresult =
null
;
try
{
calrresult = calculator.cal(expression).tostring();
}
catch
(exception e){
calrresult =
"计算错误:"
+ e.getmessage();
}
//向客户端发送消息
dowrite(calrresult);
}
catch
(unsupportedencodingexception e) {
e.printstacktrace();
}
}
//发送消息
private
void
dowrite(string result) {
byte
[] bytes = result.getbytes();
bytebuffer writebuffer = bytebuffer.allocate(bytes.length);
writebuffer.put(bytes);
writebuffer.flip();
//异步写数据 参数与前面的read一样
channel.write(writebuffer, writebuffer,
new
completionhandler<integer, bytebuffer>() {
@override
public
void
completed(integer result, bytebuffer buffer) {
//如果没有发送完,就继续发送直到完成
if
(buffer.hasremaining())
channel.write(buffer, buffer,
this
);
else
{
//创建新的buffer
bytebuffer readbuffer = bytebuffer.allocate(
1024
);
//异步读 第三个参数为接收消息回调的业务handler
channel.read(readbuffer, readbuffer,
new
readhandler(channel));
}
}
@override
public
void
failed(throwable exc, bytebuffer attachment) {
try
{
channel.close();
}
catch
(ioexception e) {
}
}
});
}
@override
public
void
failed(throwable exc, bytebuffer attachment) {
try
{
this
.channel.close();
}
catch
(ioexception e) {
e.printstacktrace();
}
}
}
|
ok,这样就已经完成了,其实说起来也简单,虽然代码感觉很多,但是api比nio的使用起来真的简单多了,主要就是监听、读、写等各种completionhandler。此处本应有一个writehandler的,确实,我们在readhandler中,以一个匿名内部类实现了它.
下面看客户端代码.
3.2、client端代码 。
client:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package
com.anxpp.io.calculator.aio.client;
import
java.util.scanner;
public
class
client {
private
static
string default_host =
"127.0.0.1"
;
private
static
int
default_port =
12345
;
private
static
asyncclienthandler clienthandle;
public
static
void
start(){
start(default_host,default_port);
}
public
static
synchronized
void
start(string ip,
int
port){
if
(clienthandle!=
null
)
return
;
clienthandle =
new
asyncclienthandler(ip,port);
new
thread(clienthandle,
"client"
).start();
}
//向服务器发送消息
public
static
boolean
sendmsg(string msg)
throws
exception{
if
(msg.equals(
"q"
))
return
false
;
clienthandle.sendmsg(msg);
return
true
;
}
@suppresswarnings
(
"resource"
)
public
static
void
main(string[] args)
throws
exception{
client.start();
system.out.println(
"请输入请求消息:"
);
scanner scanner =
new
scanner(system.in);
while
(client.sendmsg(scanner.nextline()));
}
}
|
asyncclienthandler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
package
com.anxpp.io.calculator.aio.client;
import
java.io.ioexception;
import
java.net.inetsocketaddress;
import
java.nio.bytebuffer;
import
java.nio.channels.asynchronoussocketchannel;
import
java.nio.channels.completionhandler;
import
java.util.concurrent.countdownlatch;
public
class
asyncclienthandler
implements
completionhandler<
void
, asyncclienthandler>, runnable {
private
asynchronoussocketchannel clientchannel;
private
string host;
private
int
port;
private
countdownlatch latch;
public
asyncclienthandler(string host,
int
port) {
this
.host = host;
this
.port = port;
try
{
//创建异步的客户端通道
clientchannel = asynchronoussocketchannel.open();
}
catch
(ioexception e) {
e.printstacktrace();
}
}
@override
public
void
run() {
//创建countdownlatch等待
latch =
new
countdownlatch(
1
);
//发起异步连接操作,回调参数就是这个类本身,如果连接成功会回调completed方法
clientchannel.connect(
new
inetsocketaddress(host, port),
this
,
this
);
try
{
latch.await();
}
catch
(interruptedexception e1) {
e1.printstacktrace();
}
try
{
clientchannel.close();
}
catch
(ioexception e) {
e.printstacktrace();
}
}
//连接服务器成功
//意味着tcp三次握手完成
@override
public
void
completed(
void
result, asyncclienthandler attachment) {
system.out.println(
"客户端成功连接到服务器..."
);
}
//连接服务器失败
@override
public
void
failed(throwable exc, asyncclienthandler attachment) {
system.err.println(
"连接服务器失败..."
);
exc.printstacktrace();
try
{
clientchannel.close();
latch.countdown();
}
catch
(ioexception e) {
e.printstacktrace();
}
}
//向服务器发送消息
public
void
sendmsg(string msg){
byte
[] req = msg.getbytes();
bytebuffer writebuffer = bytebuffer.allocate(req.length);
writebuffer.put(req);
writebuffer.flip();
//异步写
clientchannel.write(writebuffer, writebuffer,
new
writehandler(clientchannel, latch));
}
}
|
writehandler: 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package
com.anxpp.io.calculator.aio.client;
import
java.io.ioexception;
import
java.nio.bytebuffer;
import
java.nio.channels.asynchronoussocketchannel;
import
java.nio.channels.completionhandler;
import
java.util.concurrent.countdownlatch;
public
class
writehandler
implements
completionhandler<integer, bytebuffer> {
private
asynchronoussocketchannel clientchannel;
private
countdownlatch latch;
public
writehandler(asynchronoussocketchannel clientchannel,countdownlatch latch) {
this
.clientchannel = clientchannel;
this
.latch = latch;
}
@override
public
void
completed(integer result, bytebuffer buffer) {
//完成全部数据的写入
if
(buffer.hasremaining()) {
clientchannel.write(buffer, buffer,
this
);
}
else
{
//读取数据
bytebuffer readbuffer = bytebuffer.allocate(
1024
);
clientchannel.read(readbuffer,readbuffer,
new
readhandler(clientchannel, latch));
}
}
@override
public
void
failed(throwable exc, bytebuffer attachment) {
system.err.println(
"数据发送失败..."
);
try
{
clientchannel.close();
latch.countdown();
}
catch
(ioexception e) {
}
}
}
|
readhandler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package
com.anxpp.io.calculator.aio.client;
import
java.io.ioexception;
import
java.io.unsupportedencodingexception;
import
java.nio.bytebuffer;
import
java.nio.channels.asynchronoussocketchannel;
import
java.nio.channels.completionhandler;
import
java.util.concurrent.countdownlatch;
public
class
readhandler
implements
completionhandler<integer, bytebuffer> {
private
asynchronoussocketchannel clientchannel;
private
countdownlatch latch;
public
readhandler(asynchronoussocketchannel clientchannel,countdownlatch latch) {
this
.clientchannel = clientchannel;
this
.latch = latch;
}
@override
public
void
completed(integer result,bytebuffer buffer) {
buffer.flip();
byte
[] bytes =
new
byte
[buffer.remaining()];
buffer.get(bytes);
string body;
try
{
body =
new
string(bytes,
"utf-8"
);
system.out.println(
"客户端收到结果:"
+ body);
}
catch
(unsupportedencodingexception e) {
e.printstacktrace();
}
}
@override
public
void
failed(throwable exc,bytebuffer attachment) {
system.err.println(
"数据读取失败..."
);
try
{
clientchannel.close();
latch.countdown();
}
catch
(ioexception e) {
}
}
}
|
这个api使用起来真的是很顺手.
3.3、测试 。
test:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package
com.anxpp.io.calculator.aio;
import
java.util.scanner;
import
com.anxpp.io.calculator.aio.client.client;
import
com.anxpp.io.calculator.aio.server.server;
/**
* 测试方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public
class
test {
//测试主方法
@suppresswarnings
(
"resource"
)
public
static
void
main(string[] args)
throws
exception{
//运行服务器
server.start();
//避免客户端先于服务器启动前执行代码
thread.sleep(
100
);
//运行客户端
client.start();
system.out.println(
"请输入请求消息:"
);
scanner scanner =
new
scanner(system.in);
while
(client.sendmsg(scanner.nextline()));
}
}
|
我们可以在控制台输入我们需要计算的算数字符串,服务器就会返回结果,当然,我们也可以运行大量的客户端,都是没有问题的,以为此处设计为单例客户端,所以也就没有演示大量客户端并发.
读者可以自己修改client类,然后开辟大量线程,并使用构造方法创建很多的客户端测试.
下面是其中一次参数的输出:
1
2
3
4
5
6
7
8
9
10
11
|
服务器已启动,端口号:
12345
请输入请求消息:
客户端成功连接到服务器...
连接的客户端数:
1
123456
+
789
+
456
服务器收到消息:
123456
+
789
+
456
客户端收到结果:
124701
9526
*
56
服务器收到消息:
9526
*
56
客户端收到结果:
533456
...
|
aio是真正的异步非阻塞的,所以,在面对超级大量的客户端,更能得心应手.
下面就比较一下,几种i/o编程的优缺点.
4、各种i/o的对比 。
先以一张表来直观的对比一下:
具体选择什么样的模型或者nio框架,完全基于业务的实际应用场景和性能需求,如果客户端很少,服务器负荷不重,就没有必要选择开发起来相对不那么简单的nio做服务端;相反,就应考虑使用nio或者相关的框架了.
5、附录 。
上文中服务端使用到的用于计算的工具类:
1
2
3
4
5
6
7
8
9
10
|
package
com.anxpp.utils;
import
javax.script.scriptengine;
import
javax.script.scriptenginemanager;
import
javax.script.scriptexception;
public
final
class
calculator {
private
final
static
scriptengine jse =
new
scriptenginemanager().getenginebyname(
"javascript"
);
public
static
object cal(string expression)
throws
scriptexception{
return
jse.eval(expression);
}
}
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我.
原文链接:http://blog.csdn.net/anxpp/article/details/51512200 。
最后此篇关于详解Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)的文章就讲到这里了,如果你想了解更多关于详解Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
在进行 BIOS 开发时,我看到了对英特尔 BIOS 引用设计和英特尔固件支持包 (FSP) 的引用。英特尔是否积极支持两者,或者引用设计正在逐步淘汰以支持 FSP。 最佳答案 为了创建 FSP,引用
我知道有一些程序,比如 lojack 用于安装在 BIOS 上的笔记本电脑,但我仍然有点困惑。在阅读有关 lojack 的信息时,在我看来,在用户登录并尝试访问互联网之前,他们无法完全定位笔记本电脑的
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 8年前关闭。 Improve thi
我对 BIO 例程 BIO_read()/BIO_write() 和 SSL_read()/ 之间的区别感到困惑SSL_write() 当 BIO 是内存 BIO 而不是套接字 BIO 时。 我正在尝
我想知道什么样的序列号wmic bios get serialnumber Windows命令实际上给了你什么? 是你主板的序列号吗?文档对此不清楚。 最佳答案 wmic bios get seria
我正在尝试使用 OpenSSL 在 C++ 中创建一个简单的 FTP/FTPS 客户端实现。我已经设法使用 BIO API 与普通 FTP 一起工作。现在的问题是:一旦我有一个不安全的连接和 BIO
pe启动acpl bios error是一个非常简单解决的问题,一般都是主板不支持支持ACPI造成的,只要禁用一下就可以解决了,想要解决的用户可以来看看详细的内容。 pe启动acpl b
这里是代码的上下文: void THREAD_CC server_thread(void *arg) { BIO *client = (BIO *)arg; ... } 表达式
我正在使用 UEFI EDK2 创建 BIOS。我修改了 FDF 以将驱动程序(UEFI 和旧版本)从主固件卷移动到我严格创建的单独固件卷 (FV) 以保存驱动程序。 在我从主 FV 移动驱动程序之前
我们正在运行 32 位和 64 位的 windows xp pro service pack 3。我们正在使用 WMI 来获取 BIOS 制造商和型号,但我们确实更喜欢使用 Win32 API 或汇编
OpenSSL 中的 BIO 对到底是什么?它的用途是什么?我已经检查过 OpenSSL 文档,但任何细节都很少。 最佳答案 OpenSSL 中的 BIO 类似于文件句柄。您可以使用一对它们来安全地相
在此示例代码中: BIO *bio1 = BIO_new(BIO_s_mem()); BIO *bio2 = BIO_new(BIO_s_mem()); SSL_set_bio(ssl, bio1,
我试图将 QEMU 与我正在构建的内核 ISO 一起使用,但我无法运行测试。 我使用的是 Windows 10 64 位,并添加了 pc-bios如果这很重要,请将文件夹添加到 PATH。 到目前为止
我处于不幸的情况,我需要使用 BIOS 未在 ia32 功能控制 MSR 寄存器中启用的 CPU 功能。 BIOS 确实设置了锁定位,因此我无法自己设置该位。 BIOS (Asus UEFI BIOS
我升级了我的 mac 安装,Mountain Lion 10.8.4,但现在每次我尝试加载 Android AVD 时“Eclipse”都会出错。返回的错误是这样的: qemu: 无法加载 PC BI
下面是一些示例代码,展示了我如何使用 OpenSSL: BIO *CreateMemoryBIO() { if (BIO *bio = BIO_new(BIO_s_mem())) {
BIOS 是用汇编语言编写的,机器只能理解二进制文件。 BIOS 是系统启动时加载到内存中的第一个程序。什么编译BIOS生成二进制文件? 最佳答案 BIOS 工程师用 x86 汇编语言编写 BIOS,
使用 OpenGL VBO 时,您可以交错数据 例如,您甚至可以将顶点数据与供 CPU 而不是 GPU 使用的其他数据交错。 交错是有助于还是阻碍主流独立显卡和集成显卡的性能? 最佳答案 一般的答案是
我目前正在编写一个引导加载程序,旨在加载一个比引导扇区允许的时间更长的程序。但是,每次运行程序(我在Virtualbox和QEMU中都测试过),磁盘读取失败,磁盘重置也失败。 bootloader 被
我正在开发一个操作系统项目,使用 isolinux (syslinux 4.5) 作为引导加载程序,加载我的内核与组织在 0x200000 的多重引导头文件。 据我所知,内核已经处于 32 位保护模式
我是一名优秀的程序员,十分优秀!