- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
1.1 原生NIO存在的问题
1.2 概述
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络IO程序。Netty是一个基于NIO的网络编程框架,使用Netty可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了NIO的开发过程。知名的Elasticsearch、Dubbo框架内部都采用了Netty。
Netty具备如下优点:
2.1 传统阻塞I/O服务模型
采用阻塞IO模式获取输入的数据,每个连接都需要独立的线程完成数据的输入,业务处理和数据返回工作。
存在问题:
2.2 Reactor模型
Reactor模式,通过一个或多个输入同时传递给服务处理器的模式,服务端程序处理传入的多个请求,将他们同步分派到相应的处理线程,因此Reactor模式也叫Dispatcher模式。Reactor模式使用IO复用监听模式,收到事件后,分发给某个线程,这点就是网络服务器高并发处理关键。
1)单Reactor单线程
优点:
缺点:
2)单Reactor多线程
优点:
缺点:
3)主从Reactor多线程
优点:
缺点:
这种模式被广泛使用,包括Nginx、Memcached、Netty等。这种模式也叫做1+M+N线程模式,即使用该模式开发的服务器包含1个(或多个,1只是表示相对较少)连接建立线程+M个IO线程+N个业务处理线程。
2.3 Netty线程模型
Netty中我们使用最多的还是主从Reactor线程模型。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.72.Final</version>
</dependency>
public class DemoNettyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup=new NioEventLoopGroup(1);
//不填写线程数,默认是2*处理器线程数
EventLoopGroup workerGroup=new NioEventLoopGroup();
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
//设置服务端通道实现
.channel(NioServerSocketChannel.class)
//设置线程队列中等待连接个数
.option(ChannelOption.SO_BACKLOG,128)
//设置活跃状态,child是设置wokerGroup
.childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)
.childHandler(new ChannelInitializer<SocketChannel>() {
//创建一个通道初始化对象
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new DemoNettyServerHandle());
}
});
//启动服务端并绑定端口,将异步改为同步
ChannelFuture channelFuture = bootstrap.bind(9999).sync();
System.out.println("服务端启动成功");
//关闭通道(不是真正意义上的关闭,而是监听通道关闭状态)
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public class DemoNettyServerHandle implements ChannelInboundHandler {
@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
}
/**
* 通道读取事件
* @param channelHandlerContext
* @param o
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf byteBuf=(ByteBuf)o;
System.out.println("客户端发来消息:"+byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 读取完毕事件
* @param channelHandlerContext
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty服务端",CharsetUtil.UTF_8));
}
@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
}
/**
* 异常发生事件
* @param channelHandlerContext
* @param throwable
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
throwable.printStackTrace();
channelHandlerContext.close();
}
}
public class DemoNettyClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group=new NioEventLoopGroup();
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new DemoNettyLientHandle());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
channelFuture.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class DemoNettyLientHandle implements ChannelInboundHandler {
@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
/**
* 通道就绪事件
* @param channelHandlerContext
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("你好,我是客户端",CharsetUtil.UTF_8));
}
@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf byteBuf=(ByteBuf)o;
System.out.println("服务端发来消息:"+byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
}
}
书山有路勤为径,学海无涯苦作舟
我正在尝试使用 Netty 构建一个反向代理,并且我想保留一个到后端服务器的开放套接字池,而不是每个传入套接字都需要一个从反向代理到后端服务器的新套接字。 你能用 Netty 做到这一点吗?如何? 谢
从 Netty 3.5.x 到 Netty 4 性能提升了多少?有数据吗? 最佳答案 目前没有太大改进。不过,它的 GC 开销要少得多。一旦实现缓冲池,我相信吞吐量也会变得更好。目前,吞吐量增益约为
我正在尝试关闭与它建立连接的 Netty 服务器,但它只是挂起。这就是我所做的。 在一台机器上启动服务器,在另一台机器上启动客户端。 从客户端向服务器发送一条消息,我得到响应。 使用 Ctrl-C 关
doc说“每个轮子的默认滴答数(即轮子的大小)是 512。如果你要安排很多超时,你可以指定一个更大的值。” 这是否意味着默认情况下它只能处理 512 次超时?如果我想要 25 秒的 10 万次超时(对
我正在使用 netty 4.0.25Final 编写一个 netty HTTP 服务器。我需要根据 HTTP GET 请求中的一些参数在管道中添加各种处理程序。 pipeline.addLast(ne
我现在将 Netty 用于一些服务器端组件有一段时间了,我对此感到非常满意。因此,为了我自己的方便,我还想在客户端使用它,但我想保持小程序的占用空间(在这种情况下)尽可能小。我需要从 Netty 那里
有没有办法告诉 netty 停止监听和接受套接字上的新连接,但要完成当前连接上的任何正在进行的工作? 最佳答案 您可以关闭 ServerSocketChannel创建者 ChannelFactory
我用响应式(Reactive) mongo 创建了简单的 Webflux (kotlin) 应用程序。 Controller 有一个 GET 方法,即返回 Flow(来自一个集合的 2 个对象)。 我
我有一个新项目,我将第一次使用 Netty (v4.0.4)。我将拥有一个拥有数万个连接客户端的服务器。服务器将向这些客户端发送命令并应该接收响应。 查看 API 和在线示例,我不确定如何从服务器的角
与 boost.asio 不同,netty 没有类似 read 的方法。以下情况不方便:管理节点管理一些节点,客户端连接到管理节点以检索驻留在节点中的信息。当管理节点收到客户端的请求后,向对应的节点发
我正在编写一个应用程序,其中客户端和服务器都是使用 Netty 编写的,并且服务器应该(显然)同时支持多个客户端。我试图通过创建 1000 个客户端共享一个 EventLoopGroup 并在一台机器
如果我在 Netty 101 期间睡着了,请原谅我,但我想知道是否有一种“正确”的方式来等待 Netty 完成多步骤连接过程。假设我有一个应用程序,其过程如下所示: 打开实际连接。 执行 TLS 握手
将 Netty ChannelBuffer 转换为 String 就像在 ChannelBuffer 上调用 .toString(UTF_8) 一样简单。如何从字符串创建 ChannelBuffer?
在 Netty 3 中,我们可以这样做: Channel.setReadable(false); Channel.setReadable(true); 我读了: http://netty.io/new
我知道 Storm 现在运行在 Netty 上用于节点之间的通信? Apache Spark 是否也使用 Netty?如果真是这样,那么是以哪种方式? 最佳答案 Spark使用Akka Actor进行
很难说出这里问的是什么。这个问题是模棱两可的、模糊的、不完整的、过于宽泛的或修辞的,无法以目前的形式得到合理的回答。如需帮助澄清这个问题以便重新打开它,visit the help center .
我真的很困惑老板组的线程数。我想不出我们需要多个老板线程的情况。在 do we need more than a single thread for boss group? Netty 的创建者说,如
我已将其添加到我的管道中,并且 LoggingHandler 正在捕获其事件,但是由于事件系统从 Netty 3 更改为 4,我该如何处理这些事件,因为 IdleStateAwareUpstreamH
有没有办法在 channel 上保持状态。我正在编写一个聊天服务器,我想保留有关 channel 所属用户的信息。我在想也许 Channel 会提供一种方法来存储用户对象,但我看不到。有没有办法在不需
我有一个 netty channel ,我想在底层套接字上设置超时(默认设置为 0 )。 超时的目的是,如果 15 分钟内没有发生任何事情,则未使用的 channel 将被关闭。 虽然我没有看到任何配
我是一名优秀的程序员,十分优秀!