- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
流经网络的数据总是具有相同的类型:字节,这些字节如何传输主要取决于我们所说的网络传输。用户并不关心传输的细节,只在乎字节是否被可靠地发送和接收
如果使用 Java 网络编程,你会发现,某些时候当你需要支持高并发连接,随后你尝试将阻塞传输切换为非阻塞传输,那么你会因为这两种 API 的截然不同而遇到问题。Netty 提供了一个通用的 API,这使得转换更加简单。
这里介绍仅使用 JDK API 来实现应用程序的阻塞(OIO)和非阻塞版本(NIO)
阻塞网络编程如下:
public class PlainOioServer {
public void server(int port) throws IOException {
// 将服务器绑定到指定端口
final ServerSocket socket = new ServerSocket(port);
try {
while (true) {
// 接收连接
final Socket clientSocket = socket.accept();
System.out.println("Accepted connection from " + clientSocket);
// 创建一个新的线程来处理连接
new Thread(() -> {
OutputStream out;
try {
out = clientSocket.getOutputStream();
// 将消息写给已连接的客户端
out.write("Hi\r\n".getBytes(StandardCharsets.UTF_8));
out.flush();
// 关闭连接x
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
这段代码可以处理中等数量的并发客户端,但随着并发连接的增多,你决定改用异步网络编程,但异步的 API 是完全不同的
非阻塞版本如下:
public class PlainNioServer {
public void server(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket ssocket = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
// 将服务器绑定到选定的端口
ssocket.bind(address);
// 打开 Selector 来处理 Channel
Selector selector = Selector.open();
// 将 ServerSocket 注册到 Selector 以接受连接
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
final ByteBuffer msg = ByteBuffer.wrap("Hi\r\n".getBytes());
while (true) {
try {
// 等待需要处理的新事件,阻塞将一直持续到下一个传入事件
selector.select();
} catch (IOException e) {
e.printStackTrace();
break;
}
Set<SelectionKey> readKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
// 检查事件是否是一个新的已经就绪可以被接受的连接
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
// 接受客户端,并将它注册到选择器
client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
System.out.println("Accepted connection from " + client);
}
// 检查套接字是否已经准备好写数据
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
while (buffer.hasRemaining()) {
// 将数据写到已连接的客户端
if (client.write(buffer) == 0) {
break;
}
}
client.close();
}
} catch (IOException exception) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
cex.printStackTrace();
}
}
}
}
}
}
可以看到,阻塞和非阻塞的代码是截然不同的。如果为了实现非阻塞而完全重写程序,无疑十分困难
使用 Netty 的阻塞网络处理如下:
public class NettyOioServer {
public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("Hi\n\r", StandardCharsets.UTF_8));
EventLoopGroup group = new OioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
// 使用阻塞模式
.channel(OioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new SimpleChannelInboundHandler<>() {
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.writeAndFlush(buf.duplicate())
.addListener(ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
而非阻塞版本和阻塞版本几乎一模一样,只需要改动两处地方
EventLoopGroup group = new NioEventLoopGroup();
b.group(group).channel(NioServerSocketChannel.class);
传输 API 的核心是 interface Channel,它被用于所有的 IO 操作。每个 Channel 都将被分配一个 ChannelPipeline 和 ChannelConfig,ChannelConfig 包含了该 Channel 的所有配置设置,ChannelPipeline 持有所有将应用于入站和出站数据以及事件的 ChannelHandler 实例
除了访问所分配的 ChannelPipeline 和 ChannelConfig 之外,也可以利用 Channel 的其他方法
方法名 | 描述 |
---|---|
eventLoop | 返回分配给 Channel 的 EventLoop |
pipeline | 返回分配给 Channel 的 ChannelPipeline |
isActive | 如果 Channel 活动的,返回 true |
localAddress | 返回本地的 SocketAddress |
remoteAddress | 返回远程的 SocketAddress |
write | 将数据写到远程节点 |
flush | 将之前已写的数据冲刷到底层传输 |
writeAndFlush | 等同于调用 write() 并接着调用 flush() |
Netty 内置了一些可开箱即用的传输,但它们所支持的协议不尽相同,因此你必须选择一个和你的应用程序所使用协议相容的传输
名称 | 包 | 描述 |
---|---|---|
NIO | io.netty.channel.socket.nio | 使用 java.nio.channels 包作为基础 |
Epoll | io.netty.channel.epoll | 由 JNI 驱动的 epoll() 和非阻塞 IO,可支持只有在 Linux 上可用的多种特性,比 NIO 传输更快,且完全非阻塞 |
OIO | io.netty.channel.socket.oio | 使用 java.net 包作为基础 |
Local | io.netty.channel.local | 可以在 VM 内部通过管道进行通信的本地传输 |
Embedded | io.netty.channel.embedded | Embedded 传输,允许使用 ChannelHandler 而不需要一个真正的基于网络的传输,主要用于测试 |
我正在尝试使用 Netty 构建一个反向代理,并且我想保留一个到后端服务器的开放套接字池,而不是每个传入套接字都需要一个从反向代理到后端服务器的新套接字。 你能用 Netty 做到这一点吗?如何? 谢
从 Netty 3.5.x 到 Netty 4 性能提升了多少?有数据吗? 最佳答案 目前没有太大改进。不过,它的 GC 开销要少得多。一旦实现缓冲池,我相信吞吐量也会变得更好。目前,吞吐量增益约为
我正在尝试关闭与它建立连接的 Netty 服务器,但它只是挂起。这就是我所做的。 在一台机器上启动服务器,在另一台机器上启动客户端。 从客户端向服务器发送一条消息,我得到响应。 使用 Ctrl-C 关
doc说“每个轮子的默认滴答数(即轮子的大小)是 512。如果你要安排很多超时,你可以指定一个更大的值。” 这是否意味着默认情况下它只能处理 512 次超时?如果我想要 25 秒的 10 万次超时(对
我正在使用 netty 4.0.25Final 编写一个 netty HTTP 服务器。我需要根据 HTTP GET 请求中的一些参数在管道中添加各种处理程序。 pipeline.addLast(ne
我现在将 Netty 用于一些服务器端组件有一段时间了,我对此感到非常满意。因此,为了我自己的方便,我还想在客户端使用它,但我想保持小程序的占用空间(在这种情况下)尽可能小。我需要从 Netty 那里
有没有办法告诉 netty 停止监听和接受套接字上的新连接,但要完成当前连接上的任何正在进行的工作? 最佳答案 您可以关闭 ServerSocketChannel创建者 ChannelFactory
我用响应式(Reactive) mongo 创建了简单的 Webflux (kotlin) 应用程序。 Controller 有一个 GET 方法,即返回 Flow(来自一个集合的 2 个对象)。 我
我有一个新项目,我将第一次使用 Netty (v4.0.4)。我将拥有一个拥有数万个连接客户端的服务器。服务器将向这些客户端发送命令并应该接收响应。 查看 API 和在线示例,我不确定如何从服务器的角
与 boost.asio 不同,netty 没有类似 read 的方法。以下情况不方便:管理节点管理一些节点,客户端连接到管理节点以检索驻留在节点中的信息。当管理节点收到客户端的请求后,向对应的节点发
我正在编写一个应用程序,其中客户端和服务器都是使用 Netty 编写的,并且服务器应该(显然)同时支持多个客户端。我试图通过创建 1000 个客户端共享一个 EventLoopGroup 并在一台机器
如果我在 Netty 101 期间睡着了,请原谅我,但我想知道是否有一种“正确”的方式来等待 Netty 完成多步骤连接过程。假设我有一个应用程序,其过程如下所示: 打开实际连接。 执行 TLS 握手
将 Netty ChannelBuffer 转换为 String 就像在 ChannelBuffer 上调用 .toString(UTF_8) 一样简单。如何从字符串创建 ChannelBuffer?
在 Netty 3 中,我们可以这样做: Channel.setReadable(false); Channel.setReadable(true); 我读了: http://netty.io/new
我知道 Storm 现在运行在 Netty 上用于节点之间的通信? Apache Spark 是否也使用 Netty?如果真是这样,那么是以哪种方式? 最佳答案 Spark使用Akka Actor进行
很难说出这里问的是什么。这个问题是模棱两可的、模糊的、不完整的、过于宽泛的或修辞的,无法以目前的形式得到合理的回答。如需帮助澄清这个问题以便重新打开它,visit the help center .
我真的很困惑老板组的线程数。我想不出我们需要多个老板线程的情况。在 do we need more than a single thread for boss group? Netty 的创建者说,如
我已将其添加到我的管道中,并且 LoggingHandler 正在捕获其事件,但是由于事件系统从 Netty 3 更改为 4,我该如何处理这些事件,因为 IdleStateAwareUpstreamH
有没有办法在 channel 上保持状态。我正在编写一个聊天服务器,我想保留有关 channel 所属用户的信息。我在想也许 Channel 会提供一种方法来存储用户对象,但我看不到。有没有办法在不需
我有一个 netty channel ,我想在底层套接字上设置超时(默认设置为 0 )。 超时的目的是,如果 15 分钟内没有发生任何事情,则未使用的 channel 将被关闭。 虽然我没有看到任何配
我是一名优秀的程序员,十分优秀!