gpt4 book ai didi

java - 如何将轮询 api 转换为 react 流

转载 作者:行者123 更新时间:2023-12-02 01:25:15 25 4
gpt4 key购买 nike

假设我有一个具有以下签名的函数:

class Item {
String name;
Long id;
}
public Flux<Item> getNew(long id);

getNew() 返回在 id (0..N) 之后添加的项目流。那么如何将其变成无限流呢?

所以像这样:

public Flux<Item> observe(long id) {
return Flux.interval(Duration.ofSeconds(1)).
flatMap(counter -> getNew(id)); // <-- how to use last value from getNew flux as the new id
}

我能够做到这一点的唯一方法是使用某种类型的状态变量:

   public Flux<Long> observe(long id) {
final AtomicLong last = new AtomicLong(id);
return Flux.interval(Duration.ofSeconds(1)).
flatMap(l -> getNew(last.get())).
doOnNext(last::set);
}

有更惯用的方法吗?我尝试为此创建生成器,但我不知道如何实现它。

最佳答案

如果您可以通过检查来识别 getNew 发出的最后一个 Item,那么您可以使用 .expand运算符:

    public Flux<Item> observe(long id) {
return getNew(id)
.expand(item -> isLast(item)
? getNew(item.id)
: Flux.empty());
}
/**
* @return true if the given item is the last item emitted by getNew
*/
private boolean isLast(Item item) {
return // ... snip ...
}


如果您无法通过检查来识别最后一个Item,则必须使用状态变量。不过,我建议使用 .defer.repeat而不是 .interval...

    public Flux<Item> observe(long id) {
final AtomicLong nextStartId = new AtomicLong(id);
return Flux.defer(() -> getNew(nextStartId.get()))
.doOnNext(item -> nextStartId.set(item.id))
.repeat();
}

反对使用 .interval 的主要原因是因为:

If demand is not produced in time, an onError will be signalled

因此,如果 API 花费的时间太长,或者处理结果花费的时间太长,流将会以错误结束。对于较长的间隔,这可能不是问题,但对于相对较短的间隔(例如示例中的 1 秒),这可能是一个问题。

如果你想在每次重复迭代之前延迟,那么你可以使用.repeatWhen,以及reactor-extra的Repeat具有固定的退避。这将为您提供“固定延迟”语义,而不是“固定间隔”。

关于java - 如何将轮询 api 转换为 react 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57054753/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com