gpt4 book ai didi

java - 防止 flux.bufferTimeout 超时后溢出

转载 作者:搜寻专家 更新时间:2023-10-31 20:31:38 26 4
gpt4 key购买 nike

我对响应式编程和 Reactor 比较陌生。我有一种情况,我想在我的流中 bufferTimeout 值,同时保持它在我的控制之下(没有无限制的请求),所以我可以手动请求成批的值。

以下示例对此进行了说明:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

Flux<Object> flux = Flux.generate(sink -> {
try {
sink.next(queue.poll(10, TimeUnit.DAYS));
}
catch (InterruptedException e) {}
});

BaseSubscriber<List<Object>> subscriber = new BaseSubscriber<List<Object>>() {
protected void hookOnSubscribe(Subscription subscription) {
// Don't request unbounded
}

protected void hookOnNext(List<Object> value) {
System.out.println(value);
}
};

flux.subscribeOn(parallel())
.log()
.bufferTimeout(10, ofMillis(200))
.subscribe(subscriber);

subscriber.request(1);

// Offer a partial batch of values
queue.offer(1);
queue.offer(2);
queue.offer(3);
queue.offer(4);
queue.offer(5);

// Wait for timeout, expect [1, 2, 3, 4, 5] to be printed
Thread.sleep(500);

// Offer more values
queue.offer(6);
queue.offer(7);
queue.offer(8);
queue.offer(9);
queue.offer(10);
Thread.sleep(1000);

这是输出:

[DEBUG] (main) Using Console logging
[ INFO] (main) onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[ INFO] (main) request(10)
[ INFO] (parallel-1) onNext(1)
[ INFO] (parallel-1) onNext(2)
[ INFO] (parallel-1) onNext(3)
[ INFO] (parallel-1) onNext(4)
[ INFO] (parallel-1) onNext(5)
[1, 2, 3, 4, 5]
[ INFO] (parallel-1) onNext(6)
[ INFO] (parallel-1) onNext(7)
[ INFO] (parallel-1) onNext(8)
[ INFO] (parallel-1) onNext(9)
[ INFO] (parallel-1) onNext(10)
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests

我实际上预料到了,因为我知道缓冲区订阅者将向上游请求 10 个值,它不知道超时并且无论如何都会生成所有这些值。由于一旦超时就完成了唯一请求,所以后面提供的值仍然会产生并溢出。

我想知道是否有可能在超时完成后阻止剩余值的产生,或者在不失去控制的情况下缓冲它们。我试过:

  • limitRate(1)bufferTimeout 之前,尝试使缓冲区请求值“按需”。它确实会逐一请求,但是请求了 10 次,因为缓冲区请求了 10 个值。
  • onBackpressureBuffer(10),因为如果我做对了,问题基本上就是背压的定义。试图缓冲超时请求的溢出值,但这请求了无限值,我想避免这种情况。

看来我必须实现另一个 bufferTimeout 实现,但我听说编写发布者很难。我错过了什么吗?还是我 react 错误?

最佳答案

通过实现我自己的订阅者解决了这个问题:

https://gist.github.com/hossomi/5edf60acb534a16c025e12e4e803d014

它只请求所需数量的值,并在没有 Activity 请求时缓冲接收到的值。缓冲区是无界的,因此可能需要谨慎使用或更改它。

很可能不如标准 Reactor 订阅者可靠,但对我有用。欢迎提出建议!

关于java - 防止 flux.bufferTimeout 超时后溢出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54151419/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com