- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章详解Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
本文会从传统的bio到nio再到aio自浅至深介绍,并附上完整的代码讲解.
下面代码中会使用这样一个例子:客户端发送一段算式的字符串到服务器,服务器计算后返回结果到客户端.
代码的所有说明,都直接作为注释,嵌入到代码中,看代码时就能更容易理解,代码中会用到一个计算结果的工具类,见文章代码部分.
相关的基础知识文章推荐: java 并发(多线程) 。
1、bio编程 。
1.1、传统的bio编程 。
网络编程的基本模型是c/s模型,即两个进程间的通信.
服务端提供ip和监听端口,客户端通过连接操作想服务端监听的地址发起连接请求,通过三次握手连接,如果连接成功建立,双方就可以通过套接字进行通信.
传统的同步阻塞模型开发中,serversocket负责绑定ip地址,启动监听端口;socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。 。
简单的描述一下bio的服务端通信模型:采用bio通信模型的服务端,通常由一个独立的acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理没处理完成后,通过输出流返回应答给客户端,线程销毁。即典型的一请求一应答通宵模型.
传统bio通信模型图:
该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系,java中的线程也是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降,随着访问量的继续增大,系统最终就死-掉-了.
同步阻塞式i/o创建的server源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
package
com.anxpp.io.calculator.bio;
import
java.io.ioexception;
import
java.net.serversocket;
import
java.net.socket;
/**
* bio服务端源码
* @author yangtao__anxpp.com
* @version 1.0
*/
public
final
class
servernormal {
//默认的端口号
private
static
int
default_port =
12345
;
//单例的serversocket
private
static
serversocket server;
//根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值
public
static
void
start()
throws
ioexception{
//使用默认值
start(default_port);
}
//这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了
public
synchronized
static
void
start(
int
port)
throws
ioexception{
if
(server !=
null
)
return
;
try
{
//通过构造函数创建serversocket
//如果端口合法且空闲,服务端就监听成功
server =
new
serversocket(port);
system.out.println(
"服务器已启动,端口号:"
+ port);
//通过无线循环监听客户端连接
//如果没有客户端接入,将阻塞在accept操作上。
while
(
true
){
socket socket = server.accept();
//当有新的客户端接入时,会执行下面的代码
//然后创建一个新的线程处理这条socket链路
new
thread(
new
serverhandler(socket)).start();
}
}
finally
{
//一些必要的清理工作
if
(server !=
null
){
system.out.println(
"服务器已关闭。"
);
server.close();
server =
null
;
}
}
}
}
|
客户端消息处理线程serverhandler源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
package
com.anxpp.io.calculator.bio;
import
java.io.bufferedreader;
import
java.io.ioexception;
import
java.io.inputstreamreader;
import
java.io.printwriter;
import
java.net.socket;
import
com.anxpp.io.utils.calculator;
/**
* 客户端线程
* @author yangtao__anxpp.com
* 用于处理一个客户端的socket链路
*/
public
class
serverhandler
implements
runnable{
private
socket socket;
public
serverhandler(socket socket) {
this
.socket = socket;
}
@override
public
void
run() {
bufferedreader in =
null
;
printwriter out =
null
;
try
{
in =
new
bufferedreader(
new
inputstreamreader(socket.getinputstream()));
out =
new
printwriter(socket.getoutputstream(),
true
);
string expression;
string result;
while
(
true
){
//通过bufferedreader读取一行
//如果已经读到输入流尾部,返回null,退出循环
//如果得到非空值,就尝试计算结果并返回
if
((expression = in.readline())==
null
)
break
;
system.out.println(
"服务器收到消息:"
+ expression);
try
{
result = calculator.cal(expression).tostring();
}
catch
(exception e){
result =
"计算错误:"
+ e.getmessage();
}
out.println(result);
}
}
catch
(exception e){
e.printstacktrace();
}
finally
{
//一些必要的清理工作
if
(in !=
null
){
try
{
in.close();
}
catch
(ioexception e) {
e.printstacktrace();
}
in =
null
;
}
if
(out !=
null
){
out.close();
out =
null
;
}
if
(socket !=
null
){
try
{
socket.close();
}
catch
(ioexception e) {
e.printstacktrace();
}
socket =
null
;
}
}
}
}
|
同步阻塞式i/o创建的client源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
package
com.anxpp.io.calculator.bio;
import
java.io.bufferedreader;
import
java.io.ioexception;
import
java.io.inputstreamreader;
import
java.io.printwriter;
import
java.net.socket;
/**
* 阻塞式i/o创建的客户端
* @author yangtao__anxpp.com
* @version 1.0
*/
public
class
client {
//默认的端口号
private
static
int
default_server_port =
12345
;
private
static
string default_server_ip =
"127.0.0.1"
;
public
static
void
send(string expression){
send(default_server_port,expression);
}
public
static
void
send(
int
port,string expression){
system.out.println(
"算术表达式为:"
+ expression);
socket socket =
null
;
bufferedreader in =
null
;
printwriter out =
null
;
try
{
socket =
new
socket(default_server_ip,port);
in =
new
bufferedreader(
new
inputstreamreader(socket.getinputstream()));
out =
new
printwriter(socket.getoutputstream(),
true
);
out.println(expression);
system.out.println(
"___结果为:"
+ in.readline());
}
catch
(exception e){
e.printstacktrace();
}
finally
{
//一下必要的清理工作
if
(in !=
null
){
try
{
in.close();
}
catch
(ioexception e) {
e.printstacktrace();
}
in =
null
;
}
if
(out !=
null
){
out.close();
out =
null
;
}
if
(socket !=
null
){
try
{
socket.close();
}
catch
(ioexception e) {
e.printstacktrace();
}
socket =
null
;
}
}
}
}
|
测试代码,为了方便在控制台看输出结果,放到同一个程序(jvm)中运行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
package
com.anxpp.io.calculator.bio;
import
java.io.ioexception;
import
java.util.random;
/**
* 测试方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public
class
test {
//测试主方法
public
static
void
main(string[] args)
throws
interruptedexception {
//运行服务器
new
thread(
new
runnable() {
@override
public
void
run() {
try
{
serverbetter.start();
}
catch
(ioexception e) {
e.printstacktrace();
}
}
}).start();
//避免客户端先于服务器启动前执行代码
thread.sleep(
100
);
//运行客户端
char
operators[] = {
'+'
,
'-'
,
'*'
,
'/'
};
random random =
new
random(system.currenttimemillis());
new
thread(
new
runnable() {
@suppresswarnings
(
"static-access"
)
@override
public
void
run() {
while
(
true
){
//随机产生算术表达式
string expression = random.nextint(
10
)+
""
+operators[random.nextint(
4
)]+(random.nextint(
10
)+
1
);
client.send(expression);
try
{
thread.currentthread().sleep(random.nextint(
1000
));
}
catch
(interruptedexception e) {
e.printstacktrace();
}
}
}
}).start();
}
}
|
其中一次的运行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
服务器已启动,端口号:
12345
算术表达式为:
4
-
2
服务器收到消息:
4
-
2
___结果为:
2
算术表达式为:
5
-
10
服务器收到消息:
5
-
10
___结果为:-
5
算术表达式为:
0
-
9
服务器收到消息:
0
-
9
___结果为:-
9
算术表达式为:
0
+
6
服务器收到消息:
0
+
6
___结果为:
6
算术表达式为:
1
/
6
服务器收到消息:
1
/
6
___结果为:
0.16666666666666666
...
|
从以上代码,很容易看出,bio主要的问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程来处理这条链路,在需要满足高性能、高并发的场景是没法应用的(大量创建新的线程会严重影响服务器性能,甚至罢工).
1.2、伪异步i/o编程 。
为了改进这种一连接一线程的模型,我们可以使用线程池来管理这些线程(需要了解更多请参考前面提供的文章),实现1个或多个线程处理n个客户端的模型(但是底层还是使用的同步阻塞i/o),通常被称为“伪异步i/o模型“.
伪异步i/o模型图:
实现很简单,我们只需要将新建线程的地方,交给线程池管理即可,只需要改动刚刚的server代码即可:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
package
com.anxpp.io.calculator.bio;
import
java.io.ioexception;
import
java.net.serversocket;
import
java.net.socket;
import
java.util.concurrent.executorservice;
import
java.util.concurrent.executors;
/**
* bio服务端源码__伪异步i/o
* @author yangtao__anxpp.com
* @version 1.0
*/
public
final
class
serverbetter {
//默认的端口号
private
static
int
default_port =
12345
;
//单例的serversocket
private
static
serversocket server;
//线程池 懒汉式的单例
private
static
executorservice executorservice = executors.newfixedthreadpool(
60
);
//根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值
public
static
void
start()
throws
ioexception{
//使用默认值
start(default_port);
}
//这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了
public
synchronized
static
void
start(
int
port)
throws
ioexception{
if
(server !=
null
)
return
;
try
{
//通过构造函数创建serversocket
//如果端口合法且空闲,服务端就监听成功
server =
new
serversocket(port);
system.out.println(
"服务器已启动,端口号:"
+ port);
//通过无线循环监听客户端连接
//如果没有客户端接入,将阻塞在accept操作上。
while
(
true
){
socket socket = server.accept();
//当有新的客户端接入时,会执行下面的代码
//然后创建一个新的线程处理这条socket链路
executorservice.execute(
new
serverhandler(socket));
}
}
finally
{
//一些必要的清理工作
if
(server !=
null
){
system.out.println(
"服务器已关闭。"
);
server.close();
server =
null
;
}
}
}
}
|
测试运行结果是一样的.
我们知道,如果使用cachedthreadpool线程池(不限制线程数量,如果不清楚请参考文首提供的文章),其实除了能自动帮我们管理线程(复用),看起来也就像是1:1的客户端:线程数模型,而使用fixedthreadpool我们就有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了n:m的伪异步i/o模型.
但是,正因为限制了线程数量,如果发生大量并发请求,超过最大数量的线程就只能等待,直到线程池中的有空闲的线程可以被复用。而对socket的输入流就行读取时,会一直阻塞,直到发生:
所以在读取数据较慢时(比如数据量大、网络传输慢等),大量并发的情况下,其他接入的消息,只能一直等待,这就是最大的弊端.
而后面即将介绍的nio,就能解决这个难题.
2、nio 编程 。
jdk 1.4中的java.nio.*包中引入新的java i/o库,其目的是提高速度。实际上,“旧”的i/o包已经使用nio重新实现过,即使我们不显式的使用nio编程,也能从中受益。速度的提高在文件i/o和网络i/o中都可能会发生,但本文只讨论后者.
2.1、简介 。
nio我们一般认为是new i/o(也是官方的叫法),因为它是相对于老的i/o类库新增的(其实在jdk 1.4中就已经被引入了,但这个名词还会继续用很久,即使它们在现在看来已经是“旧”的了,所以也提示我们在命名时,需要好好考虑),做了很大的改变。但民间跟多人称之为non-block i/o,即非阻塞i/o,因为这样叫,更能体现它的特点。而下文中的nio,不是指整个新的i/o库,而是非阻塞i/o.
nio提供了与传统bio模型中的socket和serversocket相对应的socketchannel和serversocketchannel两种不同的套接字通道实现.
新增的着两种通道都支持阻塞和非阻塞两种模式.
阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反.
对于低负载、低并发的应用程序,可以使用同步阻塞i/o来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用nio的非阻塞模式来开发.
下面会先对基础知识进行介绍.
2.2、缓冲区 buffer 。
buffer是一个对象,包含一些要写入或者读出的数据.
在nio库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,也是写入到缓冲区中。任何时候访问nio中的数据,都是通过缓冲区进行操作.
缓冲区实际上是一个数组,并提供了对数据结构化访问以及维护读写位置等信息.
具体的缓存区有这些:bytebuffe、charbuffer、 shortbuffer、intbuffer、longbuffer、floatbuffer、doublebuffer。他们实现了相同的接口:buffer.
2.3、通道 channel 。
我们对数据的读取和写入要通过channel,它就像水管一样,是一个通道。通道不同于流的地方就是通道是双向的,可以用于读、写和同时读写操作.
底层的操作系统的通道一般都是全双工的,所以全双工的channel比流能更好的映射底层操作系统的api.
channel主要分两大类:
后面代码会涉及的serversocketchannel和socketchannel都是selectablechannel的子类.
2.4、多路复用器 selector 。
selector是java nio 编程的基础.
selector提供选择已经就绪的任务的能力:selector会不断轮询注册在其上的channel,如果某个channel上面发生读或者写事件,这个channel就处于就绪状态,会被selector轮询出来,然后通过selectionkey可以获取就绪channel的集合,进行后续的i/o操作.
一个selector可以同时轮询多个channel,因为jdk使用了epoll()代替传统的select实现,所以没有最大连接句柄1024/2048的限制。所以,只需要一个线程负责selector的轮询,就可以接入成千上万的客户端.
2.5、nio服务端 。
代码比传统的socket编程看起来要复杂不少.
直接贴代码吧,以注释的形式给出代码说明.
nio创建的server源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
package
com.anxpp.io.calculator.nio;
public
class
server {
private
static
int
default_port =
12345
;
private
static
serverhandle serverhandle;
public
static
void
start(){
start(default_port);
}
public
static
synchronized
void
start(
int
port){
if
(serverhandle!=
null
)
serverhandle.stop();
serverhandle =
new
serverhandle(port);
new
thread(serverhandle,
"server"
).start();
}
public
static
void
main(string[] args){
start();
}
}
|
serverhandle:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
package
com.anxpp.io.calculator.nio;
import
java.io.ioexception;
import
java.net.inetsocketaddress;
import
java.nio.bytebuffer;
import
java.nio.channels.selectionkey;
import
java.nio.channels.selector;
import
java.nio.channels.serversocketchannel;
import
java.nio.channels.socketchannel;
import
java.util.iterator;
import
java.util.set;
import
com.anxpp.io.utils.calculator;
/**
* nio服务端
* @author yangtao__anxpp.com
* @version 1.0
*/
public
class
serverhandle
implements
runnable{
private
selector selector;
private
serversocketchannel serverchannel;
private
volatile
boolean
started;
/**
* 构造方法
* @param port 指定要监听的端口号
*/
public
serverhandle(
int
port) {
try
{
//创建选择器
selector = selector.open();
//打开监听通道
serverchannel = serversocketchannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
serverchannel.configureblocking(
false
);
//开启非阻塞模式
//绑定端口 backlog设为1024
serverchannel.socket().bind(
new
inetsocketaddress(port),
1024
);
//监听客户端连接请求
serverchannel.register(selector, selectionkey.op_accept);
//标记服务器已开启
started =
true
;
system.out.println(
"服务器已启动,端口号:"
+ port);
}
catch
(ioexception e){
e.printstacktrace();
system.exit(
1
);
}
}
public
void
stop(){
started =
false
;
}
@override
public
void
run() {
//循环遍历selector
while
(started){
try
{
//无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(
1000
);
//阻塞,只有当至少一个注册的事件发生的时候才会继续.
// selector.select();
set<selectionkey> keys = selector.selectedkeys();
iterator<selectionkey> it = keys.iterator();
selectionkey key =
null
;
while
(it.hasnext()){
key = it.next();
it.remove();
try
{
handleinput(key);
}
catch
(exception e){
if
(key !=
null
){
key.cancel();
if
(key.channel() !=
null
){
key.channel().close();
}
}
}
}
}
catch
(throwable t){
t.printstacktrace();
}
}
//selector关闭后会自动释放里面管理的资源
if
(selector !=
null
)
try
{
selector.close();
}
catch
(exception e) {
e.printstacktrace();
}
}
private
void
handleinput(selectionkey key)
throws
ioexception{
if
(key.isvalid()){
//处理新接入的请求消息
if
(key.isacceptable()){
serversocketchannel ssc = (serversocketchannel) key.channel();
//通过serversocketchannel的accept创建socketchannel实例
//完成该操作意味着完成tcp三次握手,tcp物理链路正式建立
socketchannel sc = ssc.accept();
//设置为非阻塞的
sc.configureblocking(
false
);
//注册为读
sc.register(selector, selectionkey.op_read);
}
//读消息
if
(key.isreadable()){
socketchannel sc = (socketchannel) key.channel();
//创建bytebuffer,并开辟一个1m的缓冲区
bytebuffer buffer = bytebuffer.allocate(
1024
);
//读取请求码流,返回读取到的字节数
int
readbytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if
(readbytes>
0
){
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte
[] bytes =
new
byte
[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
string expression =
new
string(bytes,
"utf-8"
);
system.out.println(
"服务器收到消息:"
+ expression);
//处理数据
string result =
null
;
try
{
result = calculator.cal(expression).tostring();
}
catch
(exception e){
result =
"计算错误:"
+ e.getmessage();
}
//发送应答消息
dowrite(sc,result);
}
//没有读取到字节 忽略
// else if(readbytes==0);
//链路已经关闭,释放资源
else
if
(readbytes<
0
){
key.cancel();
sc.close();
}
}
}
}
//异步发送应答消息
private
void
dowrite(socketchannel channel,string response)
throws
ioexception{
//将消息编码为字节数组
byte
[] bytes = response.getbytes();
//根据数组容量创建bytebuffer
bytebuffer writebuffer = bytebuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writebuffer.put(bytes);
//flip操作
writebuffer.flip();
//发送缓冲区的字节数组
channel.write(writebuffer);
//****此处不含处理“写半包”的代码
}
}
|
可以看到,创建nio服务端的主要步骤如下:
因为应答消息的发送,socketchannel也是异步非阻塞的,所以不能保证一次能吧需要发送的数据发送完,此时就会出现写半包的问题。我们需要注册写操作,不断轮询selector将没有发送完的消息发送完毕,然后通过buffer的hasremain()方法判断消息是否发送完成.
2.6、nio客户端 。
还是直接上代码吧,过程也不需要太多解释了,跟服务端代码有点类似.
client:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package
com.anxpp.io.calculator.nio;
public
class
client {
private
static
string default_host =
"127.0.0.1"
;
private
static
int
default_port =
12345
;
private
static
clienthandle clienthandle;
public
static
void
start(){
start(default_host,default_port);
}
public
static
synchronized
void
start(string ip,
int
port){
if
(clienthandle!=
null
)
clienthandle.stop();
clienthandle =
new
clienthandle(ip,port);
new
thread(clienthandle,
"server"
).start();
}
//向服务器发送消息
public
static
boolean
sendmsg(string msg)
throws
exception{
if
(msg.equals(
"q"
))
return
false
;
clienthandle.sendmsg(msg);
return
true
;
}
public
static
void
main(string[] args){
start();
}
}
|
clienthandle:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
package
com.anxpp.io.calculator.nio;
import
java.io.ioexception;
import
java.net.inetsocketaddress;
import
java.nio.bytebuffer;
import
java.nio.channels.selectionkey;
import
java.nio.channels.selector;
import
java.nio.channels.socketchannel;
import
java.util.iterator;
import
java.util.set;
/**
* nio客户端
* @author yangtao__anxpp.com
* @version 1.0
*/
public
class
clienthandle
implements
runnable{
private
string host;
private
int
port;
private
selector selector;
private
socketchannel socketchannel;
private
volatile
boolean
started;
public
clienthandle(string ip,
int
port) {
this
.host = ip;
this
.port = port;
try
{
//创建选择器
selector = selector.open();
//打开监听通道
socketchannel = socketchannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
socketchannel.configureblocking(
false
);
//开启非阻塞模式
started =
true
;
}
catch
(ioexception e){
e.printstacktrace();
system.exit(
1
);
}
}
public
void
stop(){
started =
false
;
}
@override
public
void
run() {
try
{
doconnect();
}
catch
(ioexception e){
e.printstacktrace();
system.exit(
1
);
}
//循环遍历selector
while
(started){
try
{
//无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(
1000
);
//阻塞,只有当至少一个注册的事件发生的时候才会继续.
// selector.select();
set<selectionkey> keys = selector.selectedkeys();
iterator<selectionkey> it = keys.iterator();
selectionkey key =
null
;
while
(it.hasnext()){
key = it.next();
it.remove();
try
{
handleinput(key);
}
catch
(exception e){
if
(key !=
null
){
key.cancel();
if
(key.channel() !=
null
){
key.channel().close();
}
}
}
}
}
catch
(exception e){
e.printstacktrace();
system.exit(
1
);
}
}
//selector关闭后会自动释放里面管理的资源
if
(selector !=
null
)
try
{
selector.close();
}
catch
(exception e) {
e.printstacktrace();
}
}
private
void
handleinput(selectionkey key)
throws
ioexception{
if
(key.isvalid()){
socketchannel sc = (socketchannel) key.channel();
if
(key.isconnectable()){
if
(sc.finishconnect());
else
system.exit(
1
);
}
//读消息
if
(key.isreadable()){
//创建bytebuffer,并开辟一个1m的缓冲区
bytebuffer buffer = bytebuffer.allocate(
1024
);
//读取请求码流,返回读取到的字节数
int
readbytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if
(readbytes>
0
){
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte
[] bytes =
new
byte
[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
string result =
new
string(bytes,
"utf-8"
);
system.out.println(
"客户端收到消息:"
+ result);
}
//没有读取到字节 忽略
// else if(readbytes==0);
//链路已经关闭,释放资源
else
if
(readbytes<
0
){
key.cancel();
sc.close();
}
}
}
}
//异步发送消息
private
void
dowrite(socketchannel channel,string request)
throws
ioexception{
//将消息编码为字节数组
byte
[] bytes = request.getbytes();
//根据数组容量创建bytebuffer
bytebuffer writebuffer = bytebuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writebuffer.put(bytes);
//flip操作
writebuffer.flip();
//发送缓冲区的字节数组
channel.write(writebuffer);
//****此处不含处理“写半包”的代码
}
private
void
doconnect()
throws
ioexception{
if
(socketchannel.connect(
new
inetsocketaddress(host,port)));
else
socketchannel.register(selector, selectionkey.op_connect);
}
public
void
sendmsg(string msg)
throws
exception{
socketchannel.register(selector, selectionkey.op_read);
dowrite(socketchannel, msg);
}
}
|
2.7、演示结果 。
首先运行服务器,顺便也运行一个客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
package
com.anxpp.io.calculator.nio;
import
java.util.scanner;
/**
* 测试方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public
class
test {
//测试主方法
@suppresswarnings
(
"resource"
)
public
static
void
main(string[] args)
throws
exception{
//运行服务器
server.start();
//避免客户端先于服务器启动前执行代码
thread.sleep(
100
);
//运行客户端
client.start();
while
(client.sendmsg(
new
scanner(system.in).nextline()));
}
}
|
我们也可以单独运行客户端,效果都是一样的.
一次测试的结果:
1
2
3
4
5
6
7
|
服务器已启动,端口号:
12345
1
+
2
+
3
+
4
+
5
+
6
服务器收到消息:
1
+
2
+
3
+
4
+
5
+
6
客户端收到消息:
21
1
*
2
/
3
-
4
+
5
*
6
/
7
-
8
服务器收到消息:
1
*
2
/
3
-
4
+
5
*
6
/
7
-
8
客户端收到消息:-
7.0476190476190474
|
运行多个客户端,都是没有问题的.
3、aio编程 。
nio 2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现.
异步的套接字通道时真正的异步非阻塞i/o,对应于unix网络编程中的事件驱动i/o(aio)。他不需要过多的selector对注册的通道进行轮询即可实现异步读写,从而简化了nio的编程模型.
直接上代码吧.
3.1、server端代码 。
server:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package
com.anxpp.io.calculator.aio.server;
/**
* aio服务端
* @author yangtao__anxpp.com
* @version 1.0
*/
public
class
server {
private
static
int
default_port =
12345
;
private
static
asyncserverhandler serverhandle;
public
volatile
static
long
clientcount =
0
;
public
static
void
start(){
start(default_port);
}
public
static
synchronized
void
start(
int
port){
if
(serverhandle!=
null
)
return
;
serverhandle =
new
asyncserverhandler(port);
new
thread(serverhandle,
"server"
).start();
}
public
static
void
main(string[] args){
server.start();
}
}
|
asyncserverhandler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
package
com.anxpp.io.calculator.aio.server;
import
java.io.ioexception;
import
java.net.inetsocketaddress;
import
java.nio.channels.asynchronousserversocketchannel;
import
java.util.concurrent.countdownlatch;
public
class
asyncserverhandler
implements
runnable {
public
countdownlatch latch;
public
asynchronousserversocketchannel channel;
public
asyncserverhandler(
int
port) {
try
{
//创建服务端通道
channel = asynchronousserversocketchannel.open();
//绑定端口
channel.bind(
new
inetsocketaddress(port));
system.out.println(
"服务器已启动,端口号:"
+ port);
}
catch
(ioexception e) {
e.printstacktrace();
}
}
@override
public
void
run() {
//countdownlatch初始化
//它的作用:在完成一组正在执行的操作之前,允许当前的现场一直阻塞
//此处,让现场在此阻塞,防止服务端执行完成后退出
//也可以使用while(true)+sleep
//生成环境就不需要担心这个问题,以为服务端是不会退出的
latch =
new
countdownlatch(
1
);
//用于接收客户端的连接
channel.accept(
this
,
new
accepthandler());
try
{
latch.await();
}
catch
(interruptedexception e) {
e.printstacktrace();
}
}
}
|
accepthandler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package
com.anxpp.io.calculator.aio.server;
import
java.nio.bytebuffer;
import
java.nio.channels.asynchronoussocketchannel;
import
java.nio.channels.completionhandler;
//作为handler接收客户端连接
public
class
accepthandler
implements
completionhandler<asynchronoussocketchannel, asyncserverhandler> {
@override
public
void
completed(asynchronoussocketchannel channel,asyncserverhandler serverhandler) {
//继续接受其他客户端的请求
server.clientcount++;
system.out.println(
"连接的客户端数:"
+ server.clientcount);
serverhandler.channel.accept(serverhandler,
this
);
//创建新的buffer
bytebuffer buffer = bytebuffer.allocate(
1024
);
//异步读 第三个参数为接收消息回调的业务handler
channel.read(buffer, buffer,
new
readhandler(channel));
}
@override
public
void
failed(throwable exc, asyncserverhandler serverhandler) {
exc.printstacktrace();
serverhandler.latch.countdown();
}
}
|
readhandler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
package
com.anxpp.io.calculator.aio.server;
import
java.io.ioexception;
import
java.io.unsupportedencodingexception;
import
java.nio.bytebuffer;
import
java.nio.channels.asynchronoussocketchannel;
import
java.nio.channels.completionhandler;
import
com.anxpp.io.utils.calculator;
public
class
readhandler
implements
completionhandler<integer, bytebuffer> {
//用于读取半包消息和发送应答
private
asynchronoussocketchannel channel;
public
readhandler(asynchronoussocketchannel channel) {
this
.channel = channel;
}
//读取到消息后的处理
@override
public
void
completed(integer result, bytebuffer attachment) {
//flip操作
attachment.flip();
//根据
byte
[] message =
new
byte
[attachment.remaining()];
attachment.get(message);
try
{
string expression =
new
string(message,
"utf-8"
);
system.out.println(
"服务器收到消息: "
+ expression);
string calrresult =
null
;
try
{
calrresult = calculator.cal(expression).tostring();
}
catch
(exception e){
calrresult =
"计算错误:"
+ e.getmessage();
}
//向客户端发送消息
dowrite(calrresult);
}
catch
(unsupportedencodingexception e) {
e.printstacktrace();
}
}
//发送消息
private
void
dowrite(string result) {
byte
[] bytes = result.getbytes();
bytebuffer writebuffer = bytebuffer.allocate(bytes.length);
writebuffer.put(bytes);
writebuffer.flip();
//异步写数据 参数与前面的read一样
channel.write(writebuffer, writebuffer,
new
completionhandler<integer, bytebuffer>() {
@override
public
void
completed(integer result, bytebuffer buffer) {
//如果没有发送完,就继续发送直到完成
if
(buffer.hasremaining())
channel.write(buffer, buffer,
this
);
else
{
//创建新的buffer
bytebuffer readbuffer = bytebuffer.allocate(
1024
);
//异步读 第三个参数为接收消息回调的业务handler
channel.read(readbuffer, readbuffer,
new
readhandler(channel));
}
}
@override
public
void
failed(throwable exc, bytebuffer attachment) {
try
{
channel.close();
}
catch
(ioexception e) {
}
}
});
}
@override
public
void
failed(throwable exc, bytebuffer attachment) {
try
{
this
.channel.close();
}
catch
(ioexception e) {
e.printstacktrace();
}
}
}
|
ok,这样就已经完成了,其实说起来也简单,虽然代码感觉很多,但是api比nio的使用起来真的简单多了,主要就是监听、读、写等各种completionhandler。此处本应有一个writehandler的,确实,我们在readhandler中,以一个匿名内部类实现了它.
下面看客户端代码.
3.2、client端代码 。
client:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package
com.anxpp.io.calculator.aio.client;
import
java.util.scanner;
public
class
client {
private
static
string default_host =
"127.0.0.1"
;
private
static
int
default_port =
12345
;
private
static
asyncclienthandler clienthandle;
public
static
void
start(){
start(default_host,default_port);
}
public
static
synchronized
void
start(string ip,
int
port){
if
(clienthandle!=
null
)
return
;
clienthandle =
new
asyncclienthandler(ip,port);
new
thread(clienthandle,
"client"
).start();
}
//向服务器发送消息
public
static
boolean
sendmsg(string msg)
throws
exception{
if
(msg.equals(
"q"
))
return
false
;
clienthandle.sendmsg(msg);
return
true
;
}
@suppresswarnings
(
"resource"
)
public
static
void
main(string[] args)
throws
exception{
client.start();
system.out.println(
"请输入请求消息:"
);
scanner scanner =
new
scanner(system.in);
while
(client.sendmsg(scanner.nextline()));
}
}
|
asyncclienthandler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
package
com.anxpp.io.calculator.aio.client;
import
java.io.ioexception;
import
java.net.inetsocketaddress;
import
java.nio.bytebuffer;
import
java.nio.channels.asynchronoussocketchannel;
import
java.nio.channels.completionhandler;
import
java.util.concurrent.countdownlatch;
public
class
asyncclienthandler
implements
completionhandler<
void
, asyncclienthandler>, runnable {
private
asynchronoussocketchannel clientchannel;
private
string host;
private
int
port;
private
countdownlatch latch;
public
asyncclienthandler(string host,
int
port) {
this
.host = host;
this
.port = port;
try
{
//创建异步的客户端通道
clientchannel = asynchronoussocketchannel.open();
}
catch
(ioexception e) {
e.printstacktrace();
}
}
@override
public
void
run() {
//创建countdownlatch等待
latch =
new
countdownlatch(
1
);
//发起异步连接操作,回调参数就是这个类本身,如果连接成功会回调completed方法
clientchannel.connect(
new
inetsocketaddress(host, port),
this
,
this
);
try
{
latch.await();
}
catch
(interruptedexception e1) {
e1.printstacktrace();
}
try
{
clientchannel.close();
}
catch
(ioexception e) {
e.printstacktrace();
}
}
//连接服务器成功
//意味着tcp三次握手完成
@override
public
void
completed(
void
result, asyncclienthandler attachment) {
system.out.println(
"客户端成功连接到服务器..."
);
}
//连接服务器失败
@override
public
void
failed(throwable exc, asyncclienthandler attachment) {
system.err.println(
"连接服务器失败..."
);
exc.printstacktrace();
try
{
clientchannel.close();
latch.countdown();
}
catch
(ioexception e) {
e.printstacktrace();
}
}
//向服务器发送消息
public
void
sendmsg(string msg){
byte
[] req = msg.getbytes();
bytebuffer writebuffer = bytebuffer.allocate(req.length);
writebuffer.put(req);
writebuffer.flip();
//异步写
clientchannel.write(writebuffer, writebuffer,
new
writehandler(clientchannel, latch));
}
}
|
writehandler: 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package
com.anxpp.io.calculator.aio.client;
import
java.io.ioexception;
import
java.nio.bytebuffer;
import
java.nio.channels.asynchronoussocketchannel;
import
java.nio.channels.completionhandler;
import
java.util.concurrent.countdownlatch;
public
class
writehandler
implements
completionhandler<integer, bytebuffer> {
private
asynchronoussocketchannel clientchannel;
private
countdownlatch latch;
public
writehandler(asynchronoussocketchannel clientchannel,countdownlatch latch) {
this
.clientchannel = clientchannel;
this
.latch = latch;
}
@override
public
void
completed(integer result, bytebuffer buffer) {
//完成全部数据的写入
if
(buffer.hasremaining()) {
clientchannel.write(buffer, buffer,
this
);
}
else
{
//读取数据
bytebuffer readbuffer = bytebuffer.allocate(
1024
);
clientchannel.read(readbuffer,readbuffer,
new
readhandler(clientchannel, latch));
}
}
@override
public
void
failed(throwable exc, bytebuffer attachment) {
system.err.println(
"数据发送失败..."
);
try
{
clientchannel.close();
latch.countdown();
}
catch
(ioexception e) {
}
}
}
|
readhandler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package
com.anxpp.io.calculator.aio.client;
import
java.io.ioexception;
import
java.io.unsupportedencodingexception;
import
java.nio.bytebuffer;
import
java.nio.channels.asynchronoussocketchannel;
import
java.nio.channels.completionhandler;
import
java.util.concurrent.countdownlatch;
public
class
readhandler
implements
completionhandler<integer, bytebuffer> {
private
asynchronoussocketchannel clientchannel;
private
countdownlatch latch;
public
readhandler(asynchronoussocketchannel clientchannel,countdownlatch latch) {
this
.clientchannel = clientchannel;
this
.latch = latch;
}
@override
public
void
completed(integer result,bytebuffer buffer) {
buffer.flip();
byte
[] bytes =
new
byte
[buffer.remaining()];
buffer.get(bytes);
string body;
try
{
body =
new
string(bytes,
"utf-8"
);
system.out.println(
"客户端收到结果:"
+ body);
}
catch
(unsupportedencodingexception e) {
e.printstacktrace();
}
}
@override
public
void
failed(throwable exc,bytebuffer attachment) {
system.err.println(
"数据读取失败..."
);
try
{
clientchannel.close();
latch.countdown();
}
catch
(ioexception e) {
}
}
}
|
这个api使用起来真的是很顺手.
3.3、测试 。
test:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package
com.anxpp.io.calculator.aio;
import
java.util.scanner;
import
com.anxpp.io.calculator.aio.client.client;
import
com.anxpp.io.calculator.aio.server.server;
/**
* 测试方法
* @author yangtao__anxpp.com
* @version 1.0
*/
public
class
test {
//测试主方法
@suppresswarnings
(
"resource"
)
public
static
void
main(string[] args)
throws
exception{
//运行服务器
server.start();
//避免客户端先于服务器启动前执行代码
thread.sleep(
100
);
//运行客户端
client.start();
system.out.println(
"请输入请求消息:"
);
scanner scanner =
new
scanner(system.in);
while
(client.sendmsg(scanner.nextline()));
}
}
|
我们可以在控制台输入我们需要计算的算数字符串,服务器就会返回结果,当然,我们也可以运行大量的客户端,都是没有问题的,以为此处设计为单例客户端,所以也就没有演示大量客户端并发.
读者可以自己修改client类,然后开辟大量线程,并使用构造方法创建很多的客户端测试.
下面是其中一次参数的输出:
1
2
3
4
5
6
7
8
9
10
11
|
服务器已启动,端口号:
12345
请输入请求消息:
客户端成功连接到服务器...
连接的客户端数:
1
123456
+
789
+
456
服务器收到消息:
123456
+
789
+
456
客户端收到结果:
124701
9526
*
56
服务器收到消息:
9526
*
56
客户端收到结果:
533456
...
|
aio是真正的异步非阻塞的,所以,在面对超级大量的客户端,更能得心应手.
下面就比较一下,几种i/o编程的优缺点.
4、各种i/o的对比 。
先以一张表来直观的对比一下:
具体选择什么样的模型或者nio框架,完全基于业务的实际应用场景和性能需求,如果客户端很少,服务器负荷不重,就没有必要选择开发起来相对不那么简单的nio做服务端;相反,就应考虑使用nio或者相关的框架了.
5、附录 。
上文中服务端使用到的用于计算的工具类:
1
2
3
4
5
6
7
8
9
10
|
package
com.anxpp.utils;
import
javax.script.scriptengine;
import
javax.script.scriptenginemanager;
import
javax.script.scriptexception;
public
final
class
calculator {
private
final
static
scriptengine jse =
new
scriptenginemanager().getenginebyname(
"javascript"
);
public
static
object cal(string expression)
throws
scriptexception{
return
jse.eval(expression);
}
}
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我.
原文链接:http://blog.csdn.net/anxpp/article/details/51512200 。
最后此篇关于详解Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)的文章就讲到这里了,如果你想了解更多关于详解Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
有人可以解释一下 posix AIO 和 freebsd AIO 之间的区别吗?看起来功能是一样的,aio_read/aio_write。 我了解到linux native AIO只有在linux下才
最近在研究linux io模型,经常听说linux和windows IOCP一样,有自己的aio模型实现:Glibc aio和kernel aio。 问题: (1) 当人们说“linux native
我正在尝试在 Linux 上使用异步 io。据我所知,有 3 个选项: 内核调用(io_submit 和 friend ) libRT - 在用户空间使用线程 libRTKAIO - 不使用线程的内核
一 需求 使用 AsynchronousServerSocketChannel 搭建服务端,使用 AsynchronousSokectChannel 搭建客户端,完成客户端和服务端的一次通信。 二 实
我希望 aio 在读取操作完成时向我的程序发出信号,并根据 this page ,这样的通知可以通过内核发送的信号接收,也可以通过启动运行用户函数的线程来接收。可以通过设置 sigev_notify
根据 this tutorial在 linux 上使用 AIO 可以很容易地实现异步磁盘文件 io,至少从编程/api 的角度来看是这样。但在本教程之前和之后,我阅读了很多帖子和文章,认为这要么无法完
一丶IO模型&Java IO Unix为程序员提供了以下5种基本的io模型: blocking io: 阻塞io nonblocking io: 非阻塞io
介绍 什么是IO呢?什么是阻塞非阻塞IO?什么是同步异步IO?什么是IO多路复用?select/epoll跟IO模型有什么关系?有几种经典IO模型呢?BIO、NIO、AIO到底有什么区别的? 什么是I
谁能告诉我一些使用内核 AIO(即 io_submit() 系列)的(最好是常用的)应用程序,例如任何 SQL/非 SQL 数据库等?我希望它能够在每个线程上发出队列深度超过 1 的异步读取,以完全饱
根据关于 aio_read/write 的文档,AIO 库基本上可以通过两种方式通知您的应用程序异步文件 I/O 操作已完成。要么 1) 您可以使用信号,2) 您可以使用回调函数 我认为回调函数比信号
我正在尝试在 ubuntu 上编译 nginx,我正在使用以下配置参数: ./configure \ --prefix=/usr/share/nginx \ --conf-path=/etc/
我正在玩 Java NIO2,我正在尝试编写一个基于 AsynchronousServerSocketChannel 的 TCP 聊天服务器。 对一个简单的 ECHO-Server 进行编程没有问题并
我的应用程序有时会因 SIGIO 或 SIGUSR1 信号而终止,即使我已经阻止了这些信号。 我的主线程从阻塞 SIGIO 和 SIGUSR1 开始,然后进行 2 个 AIO 读取操作。这些操作使用线
有谁知道我在哪里可以获得有关最新 Linux 内核对 aio 的内核支持状态的最新信息? Google 搜索显示的网页可能已经过时得无可救药了。 编辑: 更具体地说,我对非文件相关的描述符感兴趣,例如
我正在尝试使用 C++ 异步读取和写入磁盘(使用 Ubuntu 10.04 中的 posix aio 库),遵循此处概述的说明:aio tutorial .我可以异步读写,但恐怕会出现某种小的内存泄漏
关闭。这个问题需要更多focused .它目前不接受答案。 想改进这个问题吗? 更新问题,使其只关注一个问题 editing this post . 关闭 8 年前。 Improve this qu
在 Linux 中,您可以读取 /proc/sys/fs/aio-nr 的值,这将返回总数。在系统中所有事件的 aio 上下文中分配的事件。最大值由 /proc/sys/fs/aio-max-nr 控
作为调试 IO 生成器的一部分,我在 x64 硬件上运行的 RH 6.4 下遇到了一组奇怪的结果(在多个系统上验证): -- 应用程序使用 linux 异步 IO(即非 posix)访问原始磁盘分区(
使用内核 AIO 和 O_DIRECT|O_SYNC,不会复制到内核缓冲区,并且可以在数据实际刷新到磁盘时获得细粒度通知。但是,它需要将数据保存在 io_prep_pwrite() 的用户空间缓冲区中
我正在编写一个C程序,通过直接从原始块设备文件读取来从SSD驱动器读取数据。 我正在尝试Linux AIO(我正在谈论Linux AIO API,即linuxaio.h提供的功能,例如io_submi
我是一名优秀的程序员,十分优秀!