gpt4 book ai didi

java - 如何创建Reactor Netty热流

转载 作者:太空宇宙 更新时间:2023-11-04 10:06:12 24 4
gpt4 key购买 nike

我正在尝试找到一种创建热流的方法,我可以在其中在一个方法中插入数据,而订阅者可以在另一种方法中获取数据。我已经成功使用 WorkQueueProcessor ,但我不确定这是否是正确的方法。是否可以使用 Flux.create 做同样的事情?这是我的工作片段:

  1. 调用connect();
  2. 向服务器发送字节数据,客户端将收到 tcp 服务器的响应,workQueueProcessor 将发出数据。

    @Component
    @RequiredArgsConstructor
    public class TcpCli {

    @Setter
    private Connection connection;
    private NettyOutbound out;

    //Creation of Work Queue Processor, can a Flux.create here can do the same job ?
    private WorkQueueProcessor<String> workQueueProcessor = WorkQueueProcessor.<String>builder().build();
    public Mono<? extends Connection> connect() {
    return TcpClient.create()
    .host(tcpConfig.getHost())
    .port(tcpConfig.getPort())
    .handle(this::handleConnection)
    .connect();
    }
    public Mono<String> sendData(ByteArray data) {
    out.sendByteArray(Mono.just(data)).then().subscribe();

    //Get emitted data from workQueueProcessor
    return workQueueProcessor.next();
    }
    private Publisher<Void> handleConnection(NettyInbound in, NettyOutbound out) {
    this.out = out;
    in.receive().asString()
    .log("In received")
    .subscribe(str -> {
    LOGGER.info(String.format("Inbound: %s", str));

    //Emit data to workQueueProcessor
    workQueueProcessor.onNext(str);
    });
    return out
    .neverComplete() //keep connection alive
    .log("Never close");
    }
    }

最佳答案

如果您有多个输入源和多个订阅者,我认为您的做法是正确的。

如果 NettyInbound 是您 Steam 的唯一输入源,那么您不需要使用任何处理器。只需订阅即可。

如果您的流有多个输入源,并且只有一个订阅者(在您的情况下为 NettyOutbound),您可以尝试轻量级的“UnicastProcessor”。

关于java - 如何创建Reactor Netty热流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52893080/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com