gpt4 book ai didi

netty - 当请求被写入代理 netty 服务器中的 outboundChannel 时,如何在同一个处理程序中获取响应 byteBuf

转载 作者:行者123 更新时间:2023-12-03 19:30:24 25 4
gpt4 key购买 nike

我正在实现 netty 代理服务器,如下所示:
一个http请求进来,

  • 如果本地缓存有数据,则写入 channel 并刷新
  • 如果没有,从远程服务器获取数据,将其添加到缓存并刷新

  • 我很难从与我写入客户端的相同处理程序中的响应中提取 byteBuf。

    在下面的示例中,如果您看到 channelRead HexDumpProxyFrontendHandler的方法,您将看到我如何从缓存中获取和写入。我在下面的方法中添加了评论,我遇到了困难

    此代码端到端工作。所以它可以在本地复制和测试。

    我可以看到 FullHttpResponse对象在 HexDumpProxyBackendhandler#channelRead .但是在这个方法中,我没有提到缓存,也没有我想在缓存中添加的 id。

    我认为有两种方法可以解决这个问题,但我不清楚如何做到这一点。

    1)要么在HexdumpProxyBackendHandler中获取缓存引用和id,那么它就变得容易了。但是 hexDumpBackendhanderchannelActive 中实例化的 HexDumpFrontendHandler此时我还没有解析我的传入请求

    2) 获取在 HexdumpFrontendHandler#dchannelRead中提取的响应bytebuf ,在这种情况下它只是缓存插入。

    HexDumpProxy.java
    public final class HexDumpProxy {

    static final int LOCAL_PORT = Integer.parseInt(System.getProperty("localPort", "8082"));
    static final String REMOTE_HOST = System.getProperty("remoteHost", "api.icndb.com");
    static final int REMOTE_PORT = Integer.parseInt(System.getProperty("remotePort", "80"));
    static Map<Long,String> localCache = new HashMap<>();
    public static void main(String[] args) throws Exception {
    System.err.println("Proxying *:" + LOCAL_PORT + " to " + REMOTE_HOST + ':' + REMOTE_PORT + " ...");
    localCache.put(123L, "profile1");
    localCache.put(234L, "profile2");
    // Configure the bootstrap.
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .handler(new LoggingHandler(LogLevel.INFO))
    .childHandler(new HexDumpProxyInitializer(localCache, REMOTE_HOST, REMOTE_PORT))
    .childOption(ChannelOption.AUTO_READ, false)
    .bind(LOCAL_PORT).sync().channel().closeFuture().sync();
    } finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    }

    }

    HexDumpProxyInitializer.java
    public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {

    private final String remoteHost;
    private final int remotePort;
    private Map<Long, String> cache;

    public HexDumpProxyInitializer(Map<Long,String> cache, String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
    this.cache=cache;
    }

    @Override
    public void initChannel(SocketChannel ch) {
    ch.pipeline().addLast(
    new LoggingHandler(LogLevel.INFO),
    new HttpServerCodec(),
    new HttpObjectAggregator(8*1024, true),
    new HexDumpProxyFrontendHandler(cache, remoteHost, remotePort));
    }

    }

    HexDumpProxyFrontendHandler.java
     public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
    private final String remoteHost;
    private final int remotePort;
    private Channel outboundChannel;
    private Map<Long, String> cache;

    public HexDumpProxyFrontendHandler(Map<Long, String> cache, String remoteHost, int remotePort) {
    this.remoteHost = remoteHost;
    this.remotePort = remotePort;
    this.cache = cache;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
    final Channel inboundChannel = ctx.channel();

    // Start the connection attempt.
    Bootstrap b = new Bootstrap();
    b.group(inboundChannel.eventLoop())
    .channel(ctx.channel().getClass())
    .handler((new ChannelInitializer() {
    protected void initChannel(Channel ch) {
    ChannelPipeline var2 = ch.pipeline();
    var2.addLast((new HttpClientCodec()));
    var2.addLast(new HttpObjectAggregator(8192, true));
    var2.addLast(new HexDumpProxyBackendHandler(inboundChannel));
    }
    }))
    .option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) {
    if (future.isSuccess()) {
    // connection complete start to read first data
    inboundChannel.read();
    } else {
    // Close the connection if the connection attempt has failed.
    inboundChannel.close();
    }
    }
    });
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof HttpRequest) {
    System.out.println("msg is instanceof httpRequest");
    HttpRequest req = (HttpRequest)msg;
    QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
    String userId = queryStringDecoder.parameters().get("id").get(0);
    Long id = Long.valueOf(userId);
    if (cache.containsKey(id)){
    StringBuilder buf = new StringBuilder();
    buf.append(cache.get(id));
    writeResponse(req, ctx, buf);
    closeOnFlush(ctx.channel());
    return;
    }
    }
    if (outboundChannel.isActive()) {
    outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) {
    if (future.isSuccess()) {
    // was able to flush out data, start to read the next chunk
    ctx.channel().read();
    } else {
    future.channel().close();
    }
    }
    });
    }

    //get response back from HexDumpProxyBackendHander and write to cache
    //basically I need to do cache.put(id, parse(response));
    //how to get response buf from inboundChannel here is the question I am trying to solve
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
    if (outboundChannel != null) {
    closeOnFlush(outboundChannel);
    }

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    closeOnFlush(ctx.channel());
    }

    /**
    * Closes the specified channel after all queued write requests are flushed.
    */
    static void closeOnFlush(Channel ch) {
    if (ch.isActive()) {
    ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
    }

    //borrowed from HttpSnoopServerHandler.java in snoop example
    private boolean writeResponse(HttpRequest request, ChannelHandlerContext ctx, StringBuilder buf) {
    // Decide whether to close the connection or not.
    boolean keepAlive = HttpUtil.isKeepAlive(request);
    // Build the response object.
    FullHttpResponse response = new DefaultFullHttpResponse(
    HTTP_1_1, request.decoderResult().isSuccess()? OK : BAD_REQUEST,
    Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));

    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");

    if (keepAlive) {
    // Add 'Content-Length' header only for a keep-alive connection.
    response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
    // Add keep alive header as per:
    // - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
    response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
    }

    // Encode the cookie.
    String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
    if (cookieString != null) {
    Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
    if (!cookies.isEmpty()) {
    // Reset the cookies if necessary.
    for (io.netty.handler.codec.http.cookie.Cookie cookie: cookies) {
    response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode(cookie));
    }
    }
    } else {
    // Browser sent no cookie. Add some.
    response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode("key1", "value1"));
    response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key2", "value2"));
    }

    // Write the response.
    ctx.write(response);

    return keepAlive;
    }

    }

    HexDumpProxyBackendHandler.java
    public class HexDumpProxyBackendHandler extends ChannelInboundHandlerAdapter {

    private final Channel inboundChannel;

    public HexDumpProxyBackendHandler(Channel inboundChannel) {
    this.inboundChannel = inboundChannel;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
    ctx.read();
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof FullHttpResponse) {
    System.out.println("this is fullHttpResponse");
    }
    inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) {
    if (future.isSuccess()) {
    ctx.channel().read();
    } else {
    future.channel().close();
    }
    }
    });
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
    HexDumpProxyFrontendHandler.closeOnFlush(inboundChannel);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    HexDumpProxyFrontendHandler.closeOnFlush(ctx.channel());
    }

    }

    P.S:我从 netty-example 中获取了大部分代码项目并定制它

    编辑

    根据 Ferrygig 的建议,我更改了 FrontEndChannelHander#channelRead 如下。我已经删除了 channelActive 并实现了 write 方法

    @覆盖
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof HttpRequest) {
    System.out.println("msg is instanceof httpRequest");
    HttpRequest req = (HttpRequest)msg;
    QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
    String userId = queryStringDecoder.parameters().get("id").get(0);
    id = Long.valueOf(userId);
    if (cache.containsKey(id)){
    StringBuilder buf = new StringBuilder();
    buf.append(cache.get(id));
    writeResponse(req, ctx, buf);
    closeOnFlush(ctx.channel());
    return;
    }

    final Channel inboundChannel = ctx.channel();

    //copied from channelActive method

    // Start the connection attempt.
    Bootstrap b = new Bootstrap();
    b.group(inboundChannel.eventLoop())
    .channel(ctx.channel().getClass())
    .handler((new ChannelInitializer() {
    protected void initChannel(Channel ch) {
    ChannelPipeline var2 = ch.pipeline();
    var2.addLast((new HttpClientCodec()));
    var2.addLast(new HttpObjectAggregator(8192, true));
    var2.addLast(new HexDumpProxyBackendHandler(inboundChannel, cache));
    }
    }));
    //.option(ChannelOption.AUTO_READ, false);
    ChannelFuture f = b.connect(remoteHost, remotePort);
    outboundChannel = f.channel();
    f.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) {
    if (future.isSuccess()) {
    // connection complete start to read first data
    inboundChannel.read();
    } else {
    // Close the connection if the connection attempt has failed.
    inboundChannel.close();
    }
    }
    });
    }
    if (outboundChannel.isActive()) {
    outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) {
    if (future.isSuccess()) {
    // was able to flush out data, start to read the next chunk
    ctx.channel().read();
    } else {
    future.channel().close();
    }
    }
    });
    }

    最佳答案

    有多种方法可以解决这个问题,而最终目标的实现方式也各不相同。

    目前,您正在使用 1 个连接入站是 1 个连接出站的拓扑,这使得系统设计稍微容易一些,因为您不必担心将多个请求同步到同一个出站流。

    目前,您的前端处理程序扩展了 ChannelInboundHandlerAdapter ,这只会拦截进入您的应用程序的“数据包”,如果我们让它扩展 ChannelDuplexHandler ,我们还可以处理从应用程序出去的“数据包”。

    要接近这条路径,我们需要更新 HexDumpProxyFrontendHandler要扩展的类 ChannelDuplexHandler (我们暂时称其为 CDH)。

    该过程的下一步是覆盖 write 方法来自 CDH ,因此我们可以在后端向我们发送响应时进行拦截。

    在我们创建了 write 方法之后,我们需要通过调用 put 来更新我们的(非线程安全的)映射。方法。

    public class HexDumpProxyFrontendHandler extends ChannelDuplexHandler {
    Long lastId;
    // ...
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof HttpRequest) {
    System.out.println("msg is instanceof httpRequest");
    HttpRequest req = (HttpRequest)msg;
    QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
    String userId = queryStringDecoder.parameters().get("id").get(0);
    Long id = Long.valueOf(userId);
    lastId = id; // Store ID of last request
    // ...
    }
    // ...
    }
    // ...
    public void write(
    ChannelHandlerContext ctx,
    java.lang.Object msg,
    ChannelPromise promise
    ) throws java.lang.Exception {

    if (msg instanceof FullHttpResponse) {
    System.out.println("this is fullHttpResponse");
    FullHttpResponse full = (FullHttpResponse)msg;
    cache.put(lastId, parse(full)); // TODO: Include a system here to convert the request to a string
    }
    super.write(ctx, msg, promise);
    }
    // ...
    }

    我们还没有完成,虽然我们已经有了代码,但我们仍然需要修复代码中其他地方的一些错误。

    非线程安全映射(严重错误)

    这些错误之一是您使用普通哈希映射来处理缓存。这样做的问题是这不是线程安全的,如果多人同时连接到您的应用程序,可能会发生奇怪的事情,包括随着 map 内部结构的更新而导致完整的 map 损坏。

    为了解决这个问题,我们将把 map “升级”为 ConcurrentHashMap ,这个映射有特殊的结构来处理同时请求和存储数据的多个线程,而不会有巨大的性能损失。 (如果性能是一个主要问题,您可能会通过使用每线程哈希映射而不是全局缓存来获得更高的性能,但这意味着每个资源都可以缓存到线程数量。

    无缓存移除规则(主要错误)

    目前,没有代码来删除过时的资源,这意味着缓存将被填满,直到程序没有剩余内存,然后它会崩溃。

    这可以通过使用提供线程安全访问和所谓的删除规则的映射实现来解决,或者使用已经预制的缓存解决方案,如 Gnuava caches .

    无法正确处理 HTTP Pipelining(次要错误)

    HTTP 鲜为人知的特性之一是 pipelining ,这基本上意味着客户端可以向服务器发送另一个请求,而无需等待前一个请求的响应。这种类型的错误包括服务器交换两个请求的内容,甚至完全破坏它们。

    尽管随着越来越多的 HTTP2 支持以及那里存在损坏的服务器的知识,流水线请求现在很少见,但使用它的某些 CLI 工具仍然会发生这种情况。

    要解决此问题,请在发送上一个响应后仅读取请求,其中一种方法是保留请求列表,或者使用更高级的 pre-make solutions

    关于netty - 当请求被写入代理 netty 服务器中的 outboundChannel 时,如何在同一个处理程序中获取响应 byteBuf,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55463519/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com