gpt4 book ai didi

java - 如何让 Reactor Core 的 Flux 缓存工作

转载 作者:行者123 更新时间:2023-11-30 10:27:53 26 4
gpt4 key购买 nike

我无法理解 react 堆核心的通量缓存是如何工作的。

但是,也许这个问题的答案可能是微不足道的,或者我的示例代码是彻头彻尾的愚蠢,但在我的研究之后,缓存如何与 Flux 一起工作在代码示例/博客/教程中没有显示。也许任何人都可以帮助我让它运行起来,这样其他人也有一个功能性的 Flux 缓存示例作为样板。

所以,我有以下代码:

// The source with cache:
Flux<Instant> values
= Flux
.generate((SynchronousSink<Instant> it) -> {
it.next(Instant.now());
waitingForMillis(50);
})
.take(5)
.cache(Duration.ofMillis(500)); // 500ms per item


// 1st time:
System.out.println("Printing data the 1st time, then the cache should be filled:");
values.subscribe(System.out::println);

System.out.printf("%nWaiting ...%n");
waitingForMillis(1000);

// 2nd time:
System.out.println("Printing data the 2nd time, still the data from the cache should be printed:");
values.subscribe(System.out::println);

System.out.printf("%nWaiting ...%n");
waitingForMillis(30_000);

// 3rd time:
System.out.println("Printing data the 3rd time, now, new data should be printed (be generated), because the cache timed out:");
values.subscribe(System.out::println);

我想到的场景:

实际上,我认为缓存的 Flux 值将在第 1 次和第 2 次为其订阅者提供缓存值。 - 第 3 次,在 30000 毫秒过去后,我认为该值将为订阅者提供新值,因为缓存超时。

经过我的解释后实际发生了什么:

缓存永远不会超时,并且始终向后续订阅者提供初始值。

问题(是的,多个问题):

我做错了什么?那个例子中有什么思考/解释错误?在我想到的场景之后,如何让这段代码运行以实现缓存?

最佳答案

以下代码按您的预期工作。我将生成器函数中的内部等待更改为使用 delayElements 方法并延长了缓存时间。 (它不是每个元素,它是每个元素的生命周期。)我测试了 cachetake 两种方式,任何一种方式都可以正常工作。

Flux<Instant> values =
Flux.generate(
(SynchronousSink<Instant> it) -> {
it.next(Instant.now());
})
.delayElements(Duration.ofMillis(50))
.cache(Duration.ofMillis(5000))
.take(5);

// 1st time:
System.out.println("Printing data the 1st time, then the cache should be filled:");
values.subscribe(System.out::println);

System.out.printf("%nWaiting ...%n");
waitingForMillis(1000);

// 2nd time:
System.out.println(
"Printing data the 2nd time, still the data from the cache should be printed:");
values.subscribe(System.out::println);

System.out.printf("%nWaiting ...%n");
waitingForMillis(30_000);

// 3rd time:
System.out.println(
"Printing data the 3rd time, now, new data should be printed (be generated), because the cache timed out:");
values.subscribe(System.out::println);

上面的输出是:

Printing data the 1st time, then the cache should be filled:

Waiting ...
2018-05-07T10:03:16.316Z
2018-05-07T10:03:16.412Z
2018-05-07T10:03:16.465Z
2018-05-07T10:03:16.517Z
2018-05-07T10:03:16.569Z
Printing data the 2nd time, still the data from the cache should be printed:
2018-05-07T10:03:16.316Z
2018-05-07T10:03:16.412Z
2018-05-07T10:03:16.465Z
2018-05-07T10:03:16.517Z
2018-05-07T10:03:16.569Z

Waiting ...
Printing data the 3rd time, now, new data should be printed (be generated), because the cache timed out:
2018-05-07T10:03:47.352Z
2018-05-07T10:03:47.403Z
2018-05-07T10:03:47.458Z
2018-05-07T10:03:47.511Z
2018-05-07T10:03:47.565Z

关于java - 如何让 Reactor Core 的 Flux 缓存工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45044565/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com