gpt4 book ai didi

java - Netty客户端没有收到Server发送的完整数据

转载 作者:行者123 更新时间:2023-12-02 10:33:58 26 4
gpt4 key购买 nike

我正在设计一个基于 Netty 的解决方案,通过 TCP 将文件从服务器传输到客户端。客户端指定文件的位置,然后服务器将文件发送给客户端。

目前,该解决方案适用于小文件(< 2MB 数据)。

如果要发送的文件大于~5MB,仅发送部分数据,并且情况会有所不同(每次发送的数据量不同)。另外,从日志中可以看出服务器已经发送了完整的数据量(文件)。

问题是客户端未收到服务器发送的完整数据。我的下面的代码有什么问题?或者有人能指出我正确的方向吗?

下面是我的客户端、服务器及其处理程序:(为了简洁起见,我只列出了重要的方法)

客户:

 public class FileClient {

private final static int PORT = 8992;
private final static String HOST = "127.0.0.1";

public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {

private SslContext sslContext = null;
private String srcFile = "";
private String destFile = "";

public ClientChannelInitializer(String srcFile, String destFile, SslContext sslCtx) {
this.sslContext = sslCtx;
this.srcFile = srcFile;
this.destFile = destFile;
}

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(sslContext.newHandler(socketChannel.alloc(), HOST, PORT));
pipeline.addLast("clientHandler", new FileClientHandler(srcFile, destFile));
}

}

private void startUp(String srcFile, String destFile) throws Exception {
SslContext sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
EventLoopGroup workerGroup = new NioEventLoopGroup();

Bootstrap clientBootstrap = new Bootstrap();
clientBootstrap.group(workerGroup);
clientBootstrap.channel(NioSocketChannel.class);
clientBootstrap.option(ChannelOption.TCP_NODELAY, true);
clientBootstrap.handler(new LoggingHandler(LogLevel.INFO));
clientBootstrap.handler(new ClientChannelInitializer(srcFile, destFile, sslCtx));

Channel channel = clientBootstrap.connect(new InetSocketAddress(HOST, PORT)).sync().channel();
channel.closeFuture().sync();
}
}

public static void main(String[] args) throws Exception {

String src = "/Users/home/src/test.mp4";
String dest = "/Users/home/dest/test.mp4";
new FileClient().startUp(src, dest);
}

}

客户端处理程序:

public class FileClientHandler extends SimpleChannelInboundHandler<ByteBuf> {


private final String sourceFileName;
private OutputStream outputStream;
private Path destFilePath;
private byte[] buffer = new byte[0];



public FileClientHandler(String SrcFileName, String destFileName) {
this.sourceFileName = SrcFileName;
this.destFilePath = Paths.get(destFileName);
System.out.println("DestFilePath-" + destFilePath);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(ToByteBuff(this.sourceFileName));
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuff) throws Exception {
if (this.outputStream == null) {
Files.createDirectories(this.destFilePath.getParent());
if (Files.exists(this.destFilePath)) {
Files.delete(this.destFilePath);
}
this.outputStream = Files.newOutputStream(this.destFilePath, StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
}

int size = byteBuff.readableBytes();
if (size > this.buffer.length) {
this.buffer = new byte[size];
}
byteBuff.readBytes(this.buffer, 0, size);
this.outputStream.write(this.buffer, 0, size);

}

文件服务器:

public class FileServer {
private final int PORT = 8992;

public void run() throws Exception {
SelfSignedCertificate ssc = new SelfSignedCertificate();
final SslContext sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(sslCtx.newHandler(ch.alloc()));

pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new FilServerFileHandler());
}
});
ChannelFuture f = b.bind(PORT).sync();

f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
new FileServer().run();
}
}

文件服务器处理程序:

public class FilServerFileHandler extends SimpleChannelInboundHandler<ByteBuf> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buff) throws Exception {
String filePathStr = byteBuf.toString(CharsetUtil.UTF_8);

File file = new File(filePathStr);
RandomAccessFile raf = null;
ChannelFuture sendFileFuture;
try {
raf = new RandomAccessFile(file, "r");

sendFileFuture = ctx.writeAndFlush(new ChunkedNioFile(raf.getChannel()),
ctx.newProgressivePromise());

sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
System.err.println("Transfer complete.");
}

public void operationProgressed(ChannelProgressiveFuture future, long progress, long total)
throws Exception {
if (total < 0) { // total unknown
System.err.println("Transfer progress: " + progress);
} else {
System.err.println("Transfer progress: " + progress + " / " + total);
}
}
});

} catch (FileNotFoundException fnfe) {
} finally {
if (raf != null)
raf.close();
}
}

我查过SO Q1SO Q2

最佳答案

通过在FilServerFileHandler中进行一些调整解决了您的问题:

public class FileServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buff) throws Exception {
String filePathStr = buff.toString(CharsetUtil.UTF_8);

File file = new File(filePathStr);
RandomAccessFile raf = new RandomAccessFile(file, "r");
ChannelFuture sendFileFuture;
try {
sendFileFuture = ctx.writeAndFlush(new ChunkedNioFile(raf.getChannel()), ctx.newProgressivePromise());
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
System.err.println("Transfer complete.");
if (raf != null) {
raf.close();
}
}
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total)
throws Exception {
if (total < 0) { // total unknown
System.err.println("Transfer progress: " + progress);
} else {
System.err.println("Transfer progress: " + progress + " / " + total);
}
}
});
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
}

我将 raf.close() 移至 operationComplete 方法中。

部分传输是由于写操作期间关闭raf造成的。请注意,ctx.writeAndFlush 是一个异步调用,因此 finally block 中的 raf.close() 可能会在写入操作完成之前被触发,特别是当文件大小足够大时。

关于java - Netty客户端没有收到Server发送的完整数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53436833/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com