gpt4 book ai didi

rx-java - 如何在完成时刷新 Rx 缓冲区?

转载 作者:行者123 更新时间:2023-12-02 14:39:47 28 4
gpt4 key购买 nike

我正在使用 RxJava,但也许能够翻译另一个实现的答案。

我有一个 Observable 发出一系列项目,我想将它们分成 10 组。如下所示:

observable
.buffer(10)
.map(items -> {
StringBuilder sb = new StringBuilder();
for (String item : items)
sb.append(item.document);
return sb.toString();
})
.subscribe(...)

效果很好。但我需要能够刷新不完整的缓冲区。现在,如果我的可观察对象在仅发出九个字符串后完成,我的订阅者将永远不会被调用。

有没有办法使用内置运算符来执行此操作,或者我是否必须创建一个自定义观察者来在监视 onComplete 时进行缓冲?

谢谢!

更新:经过进一步调查,问题出在上游,我有一些自定义的可观察对象在测试下工作正常,但在连接在一起时就损坏了。我基本上是这样做的:

Observable.never()
.mergeWith(Observable.range(1,8))
.buffer(5)

我还不太明白为什么,但是 never() 阻止了部分批处理的处理。然而,这非常有效:

Observable.empty()
.mergeWith(Observable.range(1,8))
.buffer(5)

我去学习一下。感谢您的帮助!

最佳答案

我不知道为什么你不从缓冲区中发出最后的项目。缓冲区将发出一组项目,但它会发出所有这些项目。

检查这个例子

@Test
public void stringBuffer() {
Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
Observable.from(numbers)
.map(number -> "uniqueKey=" + number )
.buffer(5)
.map(ns -> String.join("&", ns))
.subscribe(System.out::println);

}

即使最后一个数字 10 是组中唯一的元素,它也会被发出。

您可以在此处查看更多缓冲区示例 https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/transforming/ObservableBuffer.java

另外看看Window运算符,也许在您的用例中效果更好

https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/transforming/ObservableWindow.java

关于rx-java - 如何在完成时刷新 Rx 缓冲区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41274705/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com