gpt4 book ai didi

java - RxJava : buffer items until some condition is true for current item

转载 作者:塔克拉玛干 更新时间:2023-11-01 23:08:17 25 4
gpt4 key购买 nike

这是我试图找出的一个片段:

class RaceCondition {

Subject<Integer, Integer> subject = PublishSubject.create();

public void entryPoint(Integer data) {
subject.onNext(data);
}

public void client() {
subject /*some operations*/
.buffer(getClosingSelector())
.subscribe(/*handle results*/);
}

private Observable<Integer> getClosingSelector() {
return subject /* some filtering */;
}
}

有一个 Subject 接受来自外部的事件。有一个订阅此主题的客户端处理事件并缓冲它们。这里的主要思想是每次都应根据使用流中的项目计算出的某些条件发出缓冲项目。

为此,缓冲区边界本身会监听主题。

一个重要的期望行为:每当边界发出项目时,它也应该包含在 buffer 的后续发射中。当前配置不是这种情况,因为项目(至少我是这么认为的)是在它到达 buffer 之前从结束选择器发出的,因此它不包括在内在当前排放中,但被留在后面等待下一个排放。

有没有一种方法可以让关闭选择器首先等待项目被缓冲?如果没有,是否有另一种方法可以根据下一个传入项目缓冲和释放项目?

最佳答案

如果我理解正确,你想缓冲直到某些谓词允许它基于项目。您可以使用一组复杂的运算符来完成此操作,但编写自定义运算符可能更容易:

public final class BufferUntil<T> 
implements Operator<List<T>, T>{

final Func1<T, Boolean> boundaryPredicate;

public BufferUntil(Func1<T, Boolean> boundaryPredicate) {
this.boundaryPredicate = boundaryPredicate;
}

@Override
public Subscriber<? super T> call(
Subscriber<? super List<T>> child) {
BufferWhileSubscriber parent =
new BufferWhileSubscriber(child);
child.add(parent);
return parent;
}

final class BufferWhileSubscriber extends Subscriber<T> {
final Subscriber<? super List<T>> actual;

List<T> buffer = new ArrayList<>();

/**
* @param actual
*/
public BufferWhileSubscriber(
Subscriber<? super List<T>> actual) {
this.actual = actual;
}

@Override
public void onNext(T t) {
buffer.add(t);
if (boundaryPredicate.call(t)) {
actual.onNext(buffer);
buffer = new ArrayList<>();
}
}

@Override
public void onError(Throwable e) {
buffer = null;
actual.onError(e);
}

@Override
public void onCompleted() {
List<T> b = buffer;
buffer = null;
if (!b.isEmpty()) {
actual.onNext(b);
}
actual.onCompleted();
}
}
}

关于java - RxJava : buffer items until some condition is true for current item,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35881227/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com