gpt4 book ai didi

java - Reactor EmitterProcessor 只保留最后 n 个元素?

转载 作者:行者123 更新时间:2023-11-30 07:42:37 29 4
gpt4 key购买 nike

如何创建只保留最新 n 个元素的EmitterProcessor,这样即使没有订阅者它也能正常工作?

目前我创建了一个这样的处理器:

EmitterProcessor<Integer> processor = EmitterProcessor.create();

外部系统全天随机提供温度更新。在该系统的回调中,我执行以下操作:

void tempConsumer(int temp) {
processor.onNext(temp);
}

但是,一旦添加了 processor.getBufferSize() 元素,onNext(...) 就会阻塞。

如何创建一个处理器来丢弃最旧的元素,在这种情况下,而不是阻塞?

reactor-core #763 似乎在某种程度上涵盖了这一点。 Simon Baslé 首先讨论了 EmitterProcessorproposed change,这样当“在没有订阅者的情况下发送数据 [并且] 队列包含 bufferSize 元素时,最旧的元素被丢弃并且 onNext 已排队。”但在接下来的评论中,他说“我们不会继续执行上面我建议的更改。我们建议您使用 sink() 而不是直接使用 onNext。也就是说,在 sink() 中使用 onRequest 回调来执行与那里一样多的 sink.next(...)是请求。”

但是,如果我理解正确的话,这只涵盖了您可以按需计算新元素的情况,例如像这样:

FluxSink<Integer> sink = processor.sink();
Random random = new Random();

sink.onRequest(n -> random.nextInt()); // Generate next n requested elements.

但在我的情况下,我无法生成按需生成最新的 n 个温度读数。当然,我可以维护自己的最新读数的外部有界缓冲区,然后在 onRequest(...) 中读取它,但我假设 Reactor 可以为我做这件事?

我认为这个问题是一个重复的问题 - 但我的 Google foo 在这里让我失望了。


Ricard Kollcaku 的 answer 认为应该使用 ReplayProcessor 似乎是做事的正确方法。这是我写的另一个示例,目的是让我清楚地了解如何使用它:

ReplayProcessor<Integer> flux = ReplayProcessor.create(Queues.SMALL_BUFFER_SIZE);
FluxSink<Integer> sink = flux.sink();

// ReplayProcessor.getBufferSize() returns unbounded,
// while CAPACITY returns the capacity of the underlying buffer.
int capacity = flux.scan(Scannable.Attr.CAPACITY);

// Add twice as many elements as the underlying buffer can take.
int count = capacity * 2;

for (int i = 0; i < count; i++) {
sink.next(i);
}

// If `capacity` is 256, this will print value 256 thru to 511.
flux.subscribe(System.out::println);

我还发现 this section ,在 Hands-On Reactive Programming with Reactor 中,在解释事物时很有用。

最佳答案

你必须像这个例子一样使用ReplayProcessor:

 ReplayProcessor<Integer> directProcessor = ReplayProcessor.cacheLast();

Flux.range(1, 10)
.map(integer -> {
directProcessor.onNext(integer);
return integer;
}).doOnComplete(() -> {
directProcessor.subscribe(System.out::println);
directProcessor.subscribe(System.out::println);
})
.subscribe();

关于java - Reactor EmitterProcessor 只保留最后 n 个元素?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54404986/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com