- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
编辑: 创建了 github 存储库:https://github.com/istiillyes/client-server-netty
我使用 netty 4.0.15.Final 创建了一个客户端-服务器,并使用 OIO 和 NIO 执行了一些测试。
我要发送一些大小不一的字符串 [1KB、10KB、100KB]。
我需要服务器和客户端能够并行发送消息,因此测试如下所示:
使用NIO,消息被传输,一切正常。
使用 OIO,服务器和客户端在一段时间后在 java.net.SocketOutputStream.wirte(byte[]) 中保持阻塞状态,并且永远不会返回。
知道为什么会这样吗?我使用 netty 的方式有问题吗?
我使用普通 Java 套接字进行了同样的测试,并且成功了。所以,我猜要么我没有正确使用 netty,要么这是一个错误。
我在此处添加了创建 channel 和 channel 处理程序的代码。
这是来自客户端的堆栈跟踪,使用 YourKit 捕获:
pool-1-thread-1 [RUNNABLE, IN_NATIVE]
java.net.SocketOutputStream.write(byte[])
io.netty.buffer.UnpooledUnsafeDirectByteBuf.getBytes(int, OutputStream, int)
io.netty.buffer.AbstractByteBuf.readBytes(OutputStream, int)
io.netty.channel.oio.OioByteStreamChannel.doWriteBytes(ByteBuf)
io.netty.channel.oio.AbstractOioByteChannel.doWrite(ChannelOutboundBuffer)
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0()
io.netty.channel.AbstractChannel$AbstractUnsafe.flush()
io.netty.channel.DefaultChannelPipeline$HeadHandler.flush(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeFlush()
io.netty.channel.DefaultChannelHandlerContext.flush()
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeFlush()
io.netty.channel.DefaultChannelHandlerContext.flush()
io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeFlush()
io.netty.channel.DefaultChannelHandlerContext.flush()
io.netty.handler.logging.LoggingHandler.flush(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeFlush()
io.netty.channel.DefaultChannelHandlerContext.write(Object, boolean, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.writeAndFlush(Object, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.writeAndFlush(Object)
io.netty.channel.DefaultChannelPipeline.writeAndFlush(Object)
io.netty.channel.AbstractChannel.writeAndFlush(Object)
client.ClientHandler.channelActive(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeChannelActive()
io.netty.channel.DefaultChannelHandlerContext.fireChannelActive()
io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelHandlerContext)
io.netty.handler.logging.LoggingHandler.channelActive(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeChannelActive()
io.netty.channel.DefaultChannelHandlerContext.fireChannelActive()
io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeChannelActive()
io.netty.channel.DefaultChannelHandlerContext.fireChannelActive()
io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelHandlerContext)
io.netty.channel.DefaultChannelHandlerContext.invokeChannelActive()
io.netty.channel.DefaultChannelHandlerContext.fireChannelActive()
io.netty.channel.DefaultChannelPipeline.fireChannelActive()
io.netty.channel.oio.AbstractOioChannel$DefaultOioUnsafe.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelPipeline$HeadHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.invokeConnect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.ChannelOutboundHandlerAdapter.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.invokeConnect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.ChannelOutboundHandlerAdapter.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.invokeConnect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.ChannelDuplexHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.handler.logging.LoggingHandler.connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.invokeConnect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelHandlerContext.connect(SocketAddress, ChannelPromise)
io.netty.channel.DefaultChannelPipeline.connect(SocketAddress, ChannelPromise)
io.netty.channel.AbstractChannel.connect(SocketAddress, ChannelPromise)
io.netty.bootstrap.Bootstrap$2.run()
io.netty.channel.ThreadPerChannelEventLoop.run()
io.netty.util.concurrent.SingleThreadEventExecutor$2.run()
java.lang.Thread.run()
创建接受器 channel 的代码:
final class ServerChannelFactory {
private static final Logger LOGGER = Logger.getLogger(ServerChannelFactory.class);
protected static Channel createAcceptorChannel(
final ChannelType channelType,
final InetSocketAddress localAddress,
final ServerHandler serverHandler
) {
final ServerBootstrap serverBootstrap = ServerBootstrapFactory.createServerBootstrap(channelType);
serverBootstrap
.childHandler(new ServerChannelInitializer(serverHandler))
.option(ChannelOption.SO_BACKLOG, Resources.SO_BACKLOG);
try {
ChannelFuture channelFuture = serverBootstrap.bind(
new InetSocketAddress(localAddress.getPort())).sync();
channelFuture.awaitUninterruptibly();
if (channelFuture.isSuccess()) {
return channelFuture.channel();
} else {
LOGGER.warn(String.format("Failed to open socket! Cannot bind to port: %d!",
localAddress.getPort()));
}
} catch (InterruptedException e) {
LOGGER.error("Failed to create acceptor socket.", e);
}
return null;
}
private static class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
private ChannelHandler serverHandler;
private ServerChannelInitializer(ChannelHandler serverHandler) {
this.serverHandler = serverHandler;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// Encoders
ch.pipeline().addLast(Resources.STRING_ENCODER_NAME, new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addBefore(Resources.STRING_ENCODER_NAME, Resources.FRAME_ENCODER_NAME,
new LengthFieldPrepender(Resources.FRAME_LENGTH_FIELD_SIZE));
// Decoders
ch.pipeline().addLast(Resources.STRING_DECODER_NAME, new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addBefore(Resources.STRING_DECODER_NAME, Resources.FRAME_DECODER_NAME,
new LengthFieldBasedFrameDecoder(Resources.MAX_FRAME_LENGTH,
Resources.FRAME_LENGTH_FIELD_OFFSET, Resources.FRAME_LENGTH_FIELD_SIZE,
Resources.FRAME_LENGTH_ADJUSTMENT, Resources.FRAME_LENGTH_FIELD_SIZE));
// Handlers
ch.pipeline().addLast(Resources.LOGGING_HANDLER_NAME, new LoggingHandler());
ch.pipeline().addLast(Resources.SERVER_HANDLER_NAME, serverHandler);
}
}
}
服务器处理程序:
final class ServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER = Logger.getLogger(ServerHandler.class);
int noMessagesReceived = 0;
@Override
public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception {
for(int i=0; i< Resources.NO_MESSAGES_TO_SEND; i++) {
ctx.channel().writeAndFlush(MessageStore.getMessage(i));
}
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
noMessagesReceived++;
if(noMessagesReceived == Resources.NO_MESSAGES_TO_SEND) {
ctx.channel().writeAndFlush(MessageStore.getMessage(0));
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
FileUtils.write(Resources.SERVER_OUTPUT, String.format("Received %d messages", noMessagesReceived));
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
LOGGER.error(String.format("Exception in %s", this.getClass().getName()), cause);
}
}
服务器引导工厂:
public class ServerBootstrapFactory {
private ServerBootstrapFactory() {
}
public static ServerBootstrap createServerBootstrap(final ChannelType channelType) throws UnsupportedOperationException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
switch (channelType) {
case NIO:
serverBootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup());
serverBootstrap.channel(NioServerSocketChannel.class);
return serverBootstrap;
case OIO:
serverBootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup());
serverBootstrap.channel(OioServerSocketChannel.class);
return serverBootstrap;
default:
throw new UnsupportedOperationException("Failed to create ServerBootstrap, " + channelType + " not supported!");
}
}
}
创建连接器 channel 的代码:
final class ClientChannelFactory {
private static final Logger LOGGER = Logger.getLogger(ClientChannelFactory.class);
protected static Channel createConnectorChannel(
ChannelType channelType,
final ClientHandler clientHandler,
InetSocketAddress remoteAddress
) {
final Bootstrap bootstrap = BootstrapFactory.createBootstrap(channelType);
bootstrap.handler(new ClientChannelInitializer(clientHandler));
try {
final ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress(remoteAddress.getAddress(), remoteAddress.getPort()))
.sync();
channelFuture.awaitUninterruptibly();
if (channelFuture.isSuccess()) {
return channelFuture.channel();
} else {
LOGGER.warn(String.format(
"Failed to open socket! Cannot connect to ip: %s port: %d!",
remoteAddress.getAddress(), remoteAddress.getPort())
);
}
} catch (InterruptedException e) {
LOGGER.error("Failed to open socket!", e);
}
return null;
}
private static class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private ChannelHandler clientHandler;
private ClientChannelInitializer(ChannelHandler clientHandler) {
this.clientHandler = clientHandler;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// Encoders
ch.pipeline().addLast(Resources.STRING_ENCODER_NAME, new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addBefore(Resources.STRING_ENCODER_NAME, Resources.FRAME_ENCODER_NAME,
new LengthFieldPrepender(Resources.FRAME_LENGTH_FIELD_SIZE));
// Decoders
ch.pipeline().addLast(Resources.STRING_DECODER_NAME, new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addBefore(Resources.STRING_DECODER_NAME, Resources.FRAME_DECODER_NAME,
new LengthFieldBasedFrameDecoder(Resources.MAX_FRAME_LENGTH,
Resources.FRAME_LENGTH_FIELD_OFFSET, Resources.FRAME_LENGTH_FIELD_SIZE,
Resources.FRAME_LENGTH_ADJUSTMENT, Resources.FRAME_LENGTH_FIELD_SIZE));
// Handlers
ch.pipeline().addLast(Resources.LOGGING_HANDLER_NAME, new LoggingHandler());
ch.pipeline().addLast(Resources.CLIENT_HANDLER_NAME, clientHandler);
}
}
}
客户端处理程序:
public final class ClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER = Logger.getLogger(ClientHandler.class);
private int noMessagesReceived = 0;
@Override
public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws java.lang.Exception {
for(int i=0; i< Resources.NO_MESSAGES_TO_SEND; i++) {
ctx.channel().writeAndFlush(MessageStore.getMessage(i));
}
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
noMessagesReceived++;
if (noMessagesReceived > Resources.NO_MESSAGES_TO_SEND) {
ctx.channel().close();
}
}
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
FileUtils.write(Resources.CLIENT_OUTPUT, String.format("Received %d messages", noMessagesReceived));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error(String.format("Exception in %s", this.getClass().getName()), cause);
}
}
引导工厂:
public class BootstrapFactory {
private BootstrapFactory() {
}
public static Bootstrap createBootstrap(final ChannelType channelType) throws UnsupportedOperationException {
Bootstrap bootstrap = new Bootstrap();
switch (channelType) {
case NIO:
bootstrap.group(new NioEventLoopGroup());
bootstrap.channel(NioSocketChannel.class);
return bootstrap;
case OIO:
bootstrap.group(new OioEventLoopGroup());
bootstrap.channel(OioSocketChannel.class);
return bootstrap;
default:
throw new UnsupportedOperationException("Failed to create Bootstrap, " + channelType + " not supported!");
}
}
}
channel 类型:
public enum ChannelType {
// New IO - non-blocking
NIO,
// Old IO - blocking
OIO;
}
最佳答案
由于Netty的OIO transport是在同一个线程中进行读和写的,所以在写的过程中不会读。
问题是,如果客户端和服务器都使用 OIO 传输实现,它们最终可能会互相写入并等待对方读取它们正在写入的内容。
解决方法是 1) 至少在一侧使用 NIO,或者 2) 非常小心不要将对等方的套接字接收缓冲区填满到其最大大小。实际上,(2) 不是很容易实现,因此始终建议至少在服务器端使用 NIO 传输。
关于network-programming - Netty OIO 客户端-服务器在 java.net.SocketOutputStream.write(byte[]) 中仍然被阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21998670/
我有以下代码: foreach (byte b in bytes) { byte inv = byte.MaxValue - b; // Add the new value to a
我需要从这个文本文件source.txt中读取内容并将内容反向写入这个文本文件destination.txt。读取和写入必须使用逐字节完成! 我使用 BufferedReader 和 Buffered
我需要存储大量 RGB 颜色对象。对于某些常见用途,这些占用了我的应用程序总内存的 8% 到 12%。我目前将其定义如下: class MyColor { byte red; byte green;
我有一个由字节数组表示的整数。 byte[] result = getResult(); resultInt1 = Integer.parseInt(Bytes.toString(result));/
我正在尝试使用 Rusoto 库调用 AWS Lambda 函数。该请求有一个 JSON 编码的有效负载,我目前将其作为一个字符串,但该库为此坚持使用 bytes::bytes::Bytes 结构。我
我正在尝试基于 Tokio's example 编写一个 TCP 服务器. 当我尝试发送缓冲区时,编译器返回错误 0277。 我的代码:(playground) extern crate tokio;
我知道我可以通过 IList 进行枚举,例如: public byte[] ConvertToByteArray(IList> list) { IList newList = new List
考虑这样一个文本文件: Some text here. --- More text another line. --- Third part of text. 我想把它分成三部分,用---分隔符分开。
如果我有一个字节变量:byte b = 0; 为什么以下工作: b++; b += 1; // compiles ...但这不是吗? b = b + 1; // compile er
我有一个简单的字节数组,我想从中获取颜色。我的计划是用红色表示三位,绿色表示三位,蓝色表示两位。 8 位。 我认为颜色是正确的: 如有错误请指正 byte[] colours = new byte[
我的目标是比较两个字节数组中的两个字符串值。它实际上需要创建两个新的字符串对象才能使用 contains 方法。是选择正确还是有什么办法可以使用优化方式而不使用新的关键字。 if(new String
我正在使用github.com/tarm/serial来连接一些串行仪器。在开发过程中,我使用/dev/ttyp0和/dev/ptyp0对,其中go进程连接到一个,我使用screen连接到另一个。我编
好的,所以如果一个字节是 8 位,那么半字节就是 4 位。并且您可以将四分之一字节作为 2 位(尽管我想,如果有的话,它会被称为双位)。 虽然这是一致的,但如果我使用这个词,有人会感到困惑(或惊讶)吗
我在解释文件时遇到问题。文件构建如下: "name"-@-"date"-@-"author"-@-"signature" 签名是一个字节数组。当我读回文件时,我将其解析为 String 并拆分它: m
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题吗? Update the question所以它是on-topic用于堆栈溢出。 关闭 10 年前。 Improve thi
Java 让我很难过,因为它需要 ArrayList 的包装类秒。我将如何添加 byte[]到 ArrayList ? 最佳答案 LOL 认为我必须包装所有东西。 ArrayList作品。谢谢一晒。
我有一个 16 字节的 md5 散列,我需要使用 XOR 将其“折叠”成 4 字节数据:{1st 4 bytes} XOR {2nd 4 bytes} XOR {3rd 4 bytes} XOR {4
我正在学习SMSC smc91cx驱动代码,我学习了如何根据Application Note 9-6的说明编写smc91c111网卡的测试代码。 .我无法理解“传输数据包”下的以下说明: Write
我必须附加(可变数量的)字节数组。集合似乎只适用于包装类,即 Byte。大约 20 小时后,我想到了这个,并且它有效,但我想知道它是否可以改进(添加到列表,但欢迎任何其他改进建议:),即 Collec
我有两个基本相同的操作: insert_bytes(from, count) delete_bytes(start, stop) -> delete_bytes(from, count) insert
我是一名优秀的程序员,十分优秀!