gpt4 book ai didi

java - 如何使用 Reactor 3.x 将 List 转换为 Flux

转载 作者:行者123 更新时间:2023-12-04 09:01:09 29 4
gpt4 key购买 nike

我有一个 Asyn 调用节俭接口(interface):

public CompletableFuture<List<Long>> getFavourites(Long userId){
CompletableFuture<List<Long>> future = new CompletableFuture();
OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
callback.addObserver(new OctoObserver() {
@Override
public void onSuccess(Object o) {
future.complete((List<Long>) o);
}

@Override
public void onFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
});
try {
recommendAsyncService.getFavorites(userId, callback);
} catch (TException e) {
log.error("OctoCall RecommendAsyncService.getFavorites", e);
}
return future;
}
现在它返回一个 CompletableFuture 。然后我用 Flux 调用它来做一些处理器。
public Flux<Product> getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
// do not like it
List<Long> recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);

System.out.println(recommendList);
return Flux.fromIterable(recommendList)
.flatMap(id -> Mono.defer(() -> Mono.just(Product.builder()
.userId(userId)
.productId(id)
.productType((int) (Math.random()*100))
.build())))
.take(5)
.publishOn(mdpScheduler);
}
但是,我想从 getFavourites 获得通量方法,我可以在 getRecommend 中使用它方法。
或者,您可以推荐一个 Flux API ,我可以转换 List<Long> recommendListFlux<Long> recommendFlux .

最佳答案

转换 CompletableFuture<List<T>>变成 Flux<T>您可以使用 Mono#fromFuture Mono#flatMapMany :

var future = new CompletableFuture<List<Long>>();
future.completeAsync(() -> List.of(1L, 2L, 3L, 4L, 5L),
CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));

Flux<Long> flux = Mono.fromFuture(future).flatMapMany(Flux::fromIterable);

flux.subscribe(System.out::println);
List<T>在回调中异步接收也可以转换为 Flux<T>不使用 CompletableFuture .
您可以直接使用 Mono#create Mono#flatMapMany :
Flux<Long> flux = Mono.<List<Long>>create(sink -> {
Callback<List<Long>> callback = new Callback<List<Long>>() {
@Override
public void onResult(List<Long> list) {
sink.success(list);
}

@Override
public void onError(Exception e) {
sink.error(e);
}
};
client.call("query", callback);
}).flatMapMany(Flux::fromIterable);

flux.subscribe(System.out::println);
或者干脆使用 Flux#create 一次通过多次发射:
Flux<Long> flux = Flux.create(sink -> {
Callback<List<Long>> callback = new Callback<List<Long>>() {
@Override
public void onResult(List<Long> list) {
list.forEach(sink::next);
}

@Override
public void onError(Exception e) {
sink.error(e);
}
};
client.call("query", callback);
});

flux.subscribe(System.out::println);

关于java - 如何使用 Reactor 3.x 将 List<T> 转换为 Flux<T>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63556833/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com