- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
IO流模块:经常看、经常用、经常忘; 。
在IO流的网络模型中,以常见的「客户端-服务端」交互场景为例; 。
客户端与服务端进行通信「交互」,可能是同步或者异步,服务端进行「流」处理时,可能是阻塞或者非阻塞模式,当然也有自定义的业务流程需要执行,从处理逻辑看就是「读取数据-业务执行-应答写数据」的形式; 。
Java提供「三种」IO网络编程模型,即:「BIO同步阻塞」、「NIO同步非阻塞」、「AIO异步非阻塞」; 。
BIO即同步阻塞,服务端收到客户端的请求时,会启动一个线程处理,「交互」会阻塞直到整个流程结束; 。
这种模式如果在高并发且流程复杂耗时的场景下,客户端的请求响应会存在严重的性能问题,并且占用过多资源; 。
【 服务端 】启动ServerSocket接收客户端的请求,经过一系列逻辑之后,向客户端发送消息,注意这里线程的10秒休眠; 。
public class SocketServer01 {
public static void main(String[] args) throws Exception {
// 1、创建Socket服务端
ServerSocket serverSocket = new ServerSocket(8080);
// 2、方法阻塞等待,直到有客户端连接
Socket socket = serverSocket.accept();
// 3、输入流,输出流
InputStream inStream = socket.getInputStream();
OutputStream outStream = socket.getOutputStream();
// 4、数据接收和响应
int readLen = 0;
byte[] buf = new byte[1024];
if ((readLen=inStream.read(buf)) != -1){
// 接收数据
String readVar = new String(buf, 0, readLen) ;
System.out.println("readVar======="+readVar);
}
// 响应数据
Thread.sleep(10000);
outStream.write("sever-8080-write;".getBytes());
// 5、资源关闭
IoClose.ioClose(outStream,inStream,socket,serverSocket);
}
}
【 客户端 】Socket连接,先向ServerSocket发送请求,再接收其响应,由于Server端模拟耗时,Client处于长时间阻塞状态; 。
public class SocketClient01 {
public static void main(String[] args) throws Exception {
// 1、创建Socket客户端
Socket socket = new Socket(InetAddress.getLocalHost(), 8080);
// 2、输入流,输出流
OutputStream outStream = socket.getOutputStream();
InputStream inStream = socket.getInputStream();
// 3、数据发送和响应接收
// 发送数据
outStream.write("client-hello".getBytes());
// 接收数据
int readLen = 0;
byte[] buf = new byte[1024];
if ((readLen=inStream.read(buf)) != -1){
String readVar = new String(buf, 0, readLen) ;
System.out.println("readVar======="+readVar);
}
// 4、资源关闭
IoClose.ioClose(inStream,outStream,socket);
}
}
NIO即同步非阻塞,服务端可以实现一个线程,处理多个客户端请求连接,服务端的并发能力得到极大的提升; 。
这种模式下客户端的请求连接都会注册到Selector多路复用器上,多路复用器会进行轮询,对请求连接的IO流进行处理; 。
【 服务端 】单线程可以处理多个客户端请求,通过轮询多路复用器查看是否有IO请求; 。
public class SocketServer01 {
public static void main(String[] args) throws Exception {
try {
//启动服务开启监听
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 8989));
// 设置非阻塞,接受客户端
socketChannel.configureBlocking(false);
// 打开多路复用器
Selector selector = Selector.open();
// 服务端Socket注册到多路复用器,指定兴趣事件
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 多路复用器轮询
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
while (selector.select() > 0){
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> selectionKeyIter = selectionKeys.iterator();
while (selectionKeyIter.hasNext()){
SelectionKey selectionKey = selectionKeyIter.next() ;
selectionKeyIter.remove();
if(selectionKey.isAcceptable()) {
// 接受新的连接
SocketChannel client = socketChannel.accept();
// 设置读非阻塞
client.configureBlocking(false);
// 注册到多路复用器
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
// 通道可读
SocketChannel client = (SocketChannel) selectionKey.channel();
int len = client.read(buffer);
if (len > 0){
buffer.flip();
byte[] readArr = new byte[buffer.limit()];
buffer.get(readArr);
System.out.println(client.socket().getPort() + "端口数据:" + new String(readArr));
buffer.clear();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
【 客户端 】每隔3秒持续的向通道内写数据,服务端通过轮询多路复用器,持续的读取数据; 。
public class SocketClient01 {
public static void main(String[] args) throws Exception {
try {
// 连接服务端
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
String conVar = "client-hello";
writeBuffer.put(conVar.getBytes());
writeBuffer.flip();
// 每隔3S发送一次数据
while (true) {
Thread.sleep(3000);
writeBuffer.rewind();
socketChannel.write(writeBuffer);
writeBuffer.clear();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
AIO即异步非阻塞,对于通道内数据的「读」和「写」动作,都是采用异步的模式,对于性能的提升是巨大的; 。
这与常规的第三方对接模式很相似,本地服务在请求第三方服务时,请求过程耗时很大,会异步执行,第三方第一次回调,确认请求可以被执行;第二次回调则是推送处理结果,这种思想在处理复杂问题时,可以很大程度的提高性能,节省资源:
【 服务端 】各种「accept」、「read」、「write」动作是异步,通过Future来获取计算的结果; 。
public class SocketServer01 {
public static void main(String[] args) throws Exception {
// 启动服务开启监听
AsynchronousServerSocketChannel socketChannel = AsynchronousServerSocketChannel.open() ;
socketChannel.bind(new InetSocketAddress("127.0.0.1", 8989));
// 指定30秒内获取客户端连接,否则超时
Future<AsynchronousSocketChannel> acceptFuture = socketChannel.accept();
AsynchronousSocketChannel asyChannel = acceptFuture.get(30, TimeUnit.SECONDS);
if (asyChannel != null && asyChannel.isOpen()){
// 读数据
ByteBuffer inBuffer = ByteBuffer.allocate(1024);
Future<Integer> readResult = asyChannel.read(inBuffer);
readResult.get();
System.out.println("read:"+new String(inBuffer.array()));
// 写数据
inBuffer.flip();
Future<Integer> writeResult = asyChannel.write(ByteBuffer.wrap("server-hello".getBytes()));
writeResult.get();
}
// 关闭资源
asyChannel.close();
}
}
【 客户端 】相关「connect」、「read」、「write」方法调用是异步的,通过Future来获取计算的结果; 。
public class SocketClient01 {
public static void main(String[] args) throws Exception {
// 连接服务端
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
Future<Void> result = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));
result.get();
// 写数据
String conVar = "client-hello";
ByteBuffer reqBuffer = ByteBuffer.wrap(conVar.getBytes());
Future<Integer> writeFuture = socketChannel.write(reqBuffer);
writeFuture.get();
// 读数据
ByteBuffer inBuffer = ByteBuffer.allocate(1024);
Future<Integer> readFuture = socketChannel.read(inBuffer);
readFuture.get();
System.out.println("read:"+new String(inBuffer.array()));
// 关闭资源
socketChannel.close();
}
}
这部分内容,可以参考「 Doug Lea的《IO》 」文档,查看更多细节; 。
Reactor模式基于事件驱动设计,也称为「反应器」模式或者「分发者」模式;服务端收到多个客户端请求后,会将请求分派给对应的线程处理; 。
Reactor:负责事件的监听和分发;Handler:负责处理事件,核心逻辑「read读」、「decode解码」、「compute业务计算」、「encode编码」、「send应答数据」; 。
【1】Reactor线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发; 。
【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,并创建一个Handler对象来处理后续业务; 。
【3】如果不是连接请求事件,则Reactor会将该事件交由当前连接的Handler来处理; 。
【4】在Handler中,会完成相应的业务流程; 。
这种模式将所有逻辑「连接、读写、业务」放在一个线程中处理,避免多线程的通信,资源竞争等问题,但是存在明显的并发和性能问题; 。
【1】Reactor线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发; 。
【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,并创建一个Handler对象来处理后续业务; 。
【3】如果不是连接请求事件,则Reactor会将该事件交由当前连接的Handler来处理; 。
【4】在Handler中,只负责事件响应不处理具体业务,将数据发送给Worker线程池来处理; 。
【5】Worker线程池会分配具体的线程来处理业务,最后把结果返回给Handler做响应; 。
这种模式将业务从Reactor单线程分离处理,可以让其更专注于事件的分发和调度,Handler使用多线程也充分的利用cpu的处理能力,导致逻辑变的更加复杂,Reactor单线程依旧存在高并发的性能问题; 。
【1】 MainReactor主线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发; 。
【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,之后MainReactor将连接分配给SubReactor; 。
【3】如果不是连接请求事件,则MainReactor将连接分配给SubReactor,SubReactor调用当前连接的Handler来处理; 。
【4】在Handler中,只负责事件响应不处理具体业务,将数据发送给Worker线程池来处理; 。
【5】Worker线程池会分配具体的线程来处理业务,最后把结果返回给Handler做响应; 。
这种模式Reactor线程分工明确,MainReactor负责接收新的请求连接,SubReactor负责后续的交互业务,适应于高并发的处理场景,是Netty组件通信框架的所采用的模式; 。
【 服务端 】提供两个EventLoopGroup,「ParentGroup」主要是用来接收客户端的请求连接,真正的处理是转交给「ChildGroup」执行,即Reactor多线程模型; 。
@Slf4j
public class NettyServer {
public static void main(String[] args) {
// EventLoop组,处理事件和IO
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
// 服务端启动引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class).childHandler(new ServerChannelInit());
// 异步IO的结果
ChannelFuture channelFuture = serverBootstrap.bind(8989).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
class ServerChannelInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) {
// 获取管道
ChannelPipeline pipeline = socketChannel.pipeline();
// 编码、解码器
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// 添加自定义的handler
pipeline.addLast("serverHandler", new ServerHandler());
}
}
class ServerHandler extends ChannelInboundHandlerAdapter {
/**
* 通道读和写
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Server-Msg【"+msg+"】");
TimeUnit.MILLISECONDS.sleep(2000);
String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ;
ctx.channel().writeAndFlush("hello-client;time:" + nowTime);
ctx.fireChannelActive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
【 客户端 】通过Bootstrap类,与服务器建立连接,服务端通过ServerBootstrap启动服务,绑定在 8989 端口,然后服务端和客户端进行通信; 。
public class NettyClient {
public static void main(String[] args) {
// EventLoop处理事件和IO
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// 客户端通道引导
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class).handler(new ClientChannelInit());
// 异步IO的结果
ChannelFuture channelFuture = bootstrap.connect("localhost", 8989).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
class ClientChannelInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) {
// 获取管道
ChannelPipeline pipeline = socketChannel.pipeline();
// 编码、解码器
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// 添加自定义的handler
pipeline.addLast("clientHandler", new ClientHandler());
}
}
class ClientHandler extends ChannelInboundHandlerAdapter {
/**
* 通道读和写
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Client-Msg【"+msg+"】");
TimeUnit.MILLISECONDS.sleep(2000);
String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ;
ctx.channel().writeAndFlush("hello-server;time:" + nowTime);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().writeAndFlush("channel...active");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
编程文档:
https://gitee.com/cicadasmile/butte-java-note
应用仓库:
https://gitee.com/cicadasmile/butte-flyer-parent
最后此篇关于IO流中「线程」模型总结的文章就讲到这里了,如果你想了解更多关于IO流中「线程」模型总结的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
可不可以命名为MVVM模型?因为View通过查看模型数据。 View 是否应该只与 ViewModelData 交互?我确实在某处读到正确的 MVVM 模型应该在 ViewModel 而不是 Mode
我正在阅读有关设计模式的文章,虽然作者们都认为观察者模式很酷,但在设计方面,每个人都在谈论 MVC。 我有点困惑,MVC 图不是循环的,代码流具有闭合拓扑不是很自然吗?为什么没有人谈论这种模式: mo
我正在开发一个 Sticky Notes 项目并在 WPF 中做 UI,显然将 MVVM 作为我的架构设计选择。我正在重新考虑我的模型、 View 和 View 模型应该是什么。 我有一个名为 Not
不要混淆:How can I convert List to Hashtable in C#? 我有一个模型列表,我想将它们组织成一个哈希表,以枚举作为键,模型列表(具有枚举的值)作为值。 publi
我只是花了一些时间阅读这些术语(我不经常使用它们,因为我们没有任何 MVC 应用程序,我通常只说“模型”),但我觉得根据上下文,这些意味着不同的东西: 实体 这很简单,它是数据库中的一行: 2) In
我想知道你们中是否有人知道一些很好的教程来解释大型应用程序的 MVVM。我发现关于 MVVM 的每个教程都只是基础知识解释(如何实现模型、 View 模型和 View ),但我对在应用程序页面之间传递
我想realm.delete() 我的 Realm 中除了一个模型之外的所有模型。有什么办法可以不列出所有这些吗? 也许是一种遍历 Realm 中当前存在的所有类型的方法? 最佳答案 您可以从您的 R
我正在尝试使用 alias 指令模拟一个 Eloquent 模型,如下所示: $transporter = \Mockery::mock('alias:' . Transporter::class)
我正在使用 stargazer 创建我的 plm 汇总表。 library(plm) library(pglm) data("Unions", package = "pglm") anb1 <- pl
我读了几篇与 ASP.NET 分层架构相关的文章和问题,但是读得太多后我有点困惑。 UI 层是在 ASP.NET MVC 中开发的,对于数据访问,我在项目中使用 EF。 我想通过一个例子来描述我的问题
我收到此消息错误: Inceptionv3.mlmodel: unable to read document 我下载了最新版本的 xcode。 9.4 版测试版 (9Q1004a) 最佳答案 您没有
(同样,一个 MVC 验证问题。我知道,我知道......) 我想使用 AutoMapper ( http://automapper.codeplex.com/ ) 来验证我的创建 View 中不在我
需要澄清一件事,现在我正在处理一个流程,其中我有两个 View 模型,一个依赖于另一个 View 模型,为了处理这件事,我尝试在我的基本 Activity 中注入(inject)两个 View 模型,
如果 WPF MVVM 应该没有代码,为什么在使用 ICommand 时,是否需要在 Window.xaml.cs 代码中实例化 DataContext 属性?我已经并排观看并关注了 YouTube
当我第一次听说 ASP.NET MVC 时,我认为这意味着应用程序由三个部分组成:模型、 View 和 Controller 。 然后我读到 NerdDinner并学习了存储库和 View 模型的方法
Platform : ubuntu 16.04 Python version: 3.5.2 mmdnn version : 0.2.5 Source framework with version :
我正在学习本教程:https://www.raywenderlich.com/160728/object-oriented-programming-swift ...并尝试对代码进行一些个人调整,看看
我正试图围绕 AngularJS。我很喜欢它,但一个核心概念似乎在逃避我——模型在哪里? 例如,如果我有一个显示多个交易列表的应用程序。一个列表向服务器查询匹配某些条件的分页事务集,另一个列表使用不同
我在为某个应用程序找出最佳方法时遇到了麻烦。我不太习惯取代旧 TLA(三层架构)的新架构,所以这就是我的来源。 在为我的应用程序(POCO 类,对吧??)设计模型和 DAL 时,我有以下疑问: 我的模
我有两个模型:Person 和 Department。每个人可以在一个部门工作。部门可以由多人管理。我不确定如何在 Django 模型中构建这种关系。 这是我不成功的尝试之一 [models.py]:
我是一名优秀的程序员,十分优秀!