gpt4 book ai didi

java - 如何从异步回调创建多个 Flux

转载 作者:行者123 更新时间:2023-11-30 05:48:37 24 4
gpt4 key购买 nike

从 Reactor 的引用指南中,我了解到 Flux.create()可用于将 aysnc 回调转换为 Flux .

但是,有时回调有多种方法来接收多种类型的数据,假设我有如下代码:

asrService.recognize(new Callback() {
@Override
public void stateChange(State state) {
// consume state
}

@Override
public void onResultData(Result result) {
// consume result
}
});

如何将其转换为两个 react 流:Flux<State>Flux<Result>

最佳答案

一种方法是使用一些处理器,例如 DirectProcessor,您可以创建 2 个不同的处理器,并在事件时将项目发送到处理器并订阅处理器,但如果您仍然想使用 Flux.create,您可以这样做

    Flux<Object> objectFlux;

@Override
public void run(String... args) throws Exception {

objectFlux = Flux.create(objectFluxSink ->
asrService.recognize(new Callback() {
@Override
public void stateChange(State state) {
objectFluxSink.next(state);
}

@Override
public void onResultData(Result result) {
objectFluxSink.next(state);
}
}));





}

public Flux<Result> getResult(){
return objectFlux.filter(o -> o instanceof Result)
.map(o -> ((Result)o));
}

public Flux<State> geState(){
return objectFlux.filter(o -> o instanceof State)
.map(o -> ((State)o));
}

我仍然认为使用处理器应该更干净,你不需要进行过滤和转换,但你需要有 2 个处理器像这样:

        DirectProcessor <Result> resultDirectProcessor = DirectProcessor.create();
DirectProcessor<State> stateDirectProcessor = DirectProcessor.create();
asrService.recognize(new Callback() {
@Override
public void stateChange(State state) {
stateDirectProcessor.onNext(state);
}

@Override
public void onResultData(Result result) {
resultDirectProcessor.onNext(result);
}
});

关于java - 如何从异步回调创建多个 Flux,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54395533/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com