我正在尝试找到一种创建热流的方法,我可以在其中在一个方法中插入数据,而订阅者可以在另一种方法中获取数据。我已经成功使用 WorkQueueProcessor ,但我不确定这是否是正确的方法。是否可以使用 Flux.create 做同样的事情?这是我的工作片段:
- 调用
connect();
向服务器发送字节数据,客户端将收到 tcp
服务器的响应,workQueueProcessor
将发出数据。
@Component
@RequiredArgsConstructor
public class TcpCli {
@Setter
private Connection connection;
private NettyOutbound out;
//Creation of Work Queue Processor, can a Flux.create here can do the same job ?
private WorkQueueProcessor<String> workQueueProcessor = WorkQueueProcessor.<String>builder().build();
public Mono<? extends Connection> connect() {
return TcpClient.create()
.host(tcpConfig.getHost())
.port(tcpConfig.getPort())
.handle(this::handleConnection)
.connect();
}
public Mono<String> sendData(ByteArray data) {
out.sendByteArray(Mono.just(data)).then().subscribe();
//Get emitted data from workQueueProcessor
return workQueueProcessor.next();
}
private Publisher<Void> handleConnection(NettyInbound in, NettyOutbound out) {
this.out = out;
in.receive().asString()
.log("In received")
.subscribe(str -> {
LOGGER.info(String.format("Inbound: %s", str));
//Emit data to workQueueProcessor
workQueueProcessor.onNext(str);
});
return out
.neverComplete() //keep connection alive
.log("Never close");
}
}
如果您有多个输入源和多个订阅者,我认为您的做法是正确的。
如果 NettyInbound 是您 Steam 的唯一输入源,那么您不需要使用任何处理器。只需订阅即可。
如果您的流有多个输入源,并且只有一个订阅者(在您的情况下为 NettyOutbound),您可以尝试轻量级的“UnicastProcessor”。
我是一名优秀的程序员,十分优秀!