gpt4 book ai didi

java - RxJava 反跳操作符取决于消息

转载 作者:行者123 更新时间:2023-11-30 02:11:52 26 4
gpt4 key购买 nike

我研究 RxJava 并尝试了解如何实现非标准响应式(Reactive)“debouce”逻辑。取决于消息,新运算符(operator)必须延迟某种类型的消息,或者如果从可观察到的另一种类型的消息到达则跳过它。

Debouce only A-messages or forget about it if another message arrived

请帮我编写这个逻辑。

最佳答案

这需要一个不平凡的运算符组合:

public static <T> ObservableTransformer<T, T> debounceOnly(
Predicate<? super T> condition, long time,
TimeUnit unit, Scheduler scheduler) {
return o -> o.publish(f ->
f.concatMapEager(v -> {
if (condition.test(v)) {
return Observable.just(v).delay(time, unit, scheduler).takeUntil(f);
}
return Observable.just(v);
})
);
}


@Test
public void test() {
PublishSubject<String> subject = PublishSubject.create();

TestScheduler sch = new TestScheduler();

subject
.compose(debounceOnly(v -> v.startsWith("A"),
100, TimeUnit.MILLISECONDS, sch))
.subscribe(System.out::println, Throwable::printStackTrace,
() -> System.out.println("Done"));

subject.onNext("A1");

sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);

subject.onNext("B1");
sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);

subject.onNext("C1");
sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);

subject.onNext("A2");
sch.advanceTimeBy(50, TimeUnit.MILLISECONDS);

subject.onNext("A3");
sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);

subject.onNext("A4");
sch.advanceTimeBy(50, TimeUnit.MILLISECONDS);

subject.onNext("B2");
sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);

subject.onNext("C2");
sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);

subject.onComplete();
}

关于java - RxJava 反跳操作符取决于消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49827715/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com