gpt4 book ai didi

java - RxJava : How to Subscribe only if the Observable is Cold?

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:16:04 25 4
gpt4 key购买 nike

TL;DR:如何创建一个仅在时创建订阅并在时对任何其他订阅调用进行排队的 Observable?

我想创建一个一次只能执行一个订阅的 Observable。如果有任何其他订阅者订阅了 Observable,我希望他们在 Observable 完成时(在 onComplete 之后)排队运行。

我可以通过拥有某种堆栈并在每次 onComplete 时弹出堆栈来自己构建这个结构——但感觉这个功能已经存在于 RxJava 中。

有没有办法以这种方式限制订阅?

(More on hot and cold observables)

最佳答案

我认为没有内置运算符或运算符组合可以实现此目的。以下是我将如何实现它:

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.*;

import rx.*;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;

public final class SequenceSubscribers<T> implements Observable.OnSubscribe<T> {

final Observable<? extends T> source;

final Queue<Subscriber<? super T>> queue;

final AtomicInteger wip;

volatile boolean active;

public SequenceSubscribers(Observable<? extends T> source) {
this.source = source;
this.queue = new ConcurrentLinkedQueue<>();
this.wip = new AtomicInteger();
}

@Override
public void call(Subscriber<? super T> t) {
SubscriberWrapper wrapper = new SubscriberWrapper(t);
queue.add(wrapper);

t.add(wrapper);
t.add(Subscriptions.create(() -> wrapper.next()));

drain();
}

void complete(SubscriberWrapper inner) {
active = false;
drain();
}

void drain() {
if (wip.getAndIncrement() != 0) {
return;
}
do {
if (!active) {
Subscriber<? super T> s = queue.poll();
if (s != null && !s.isUnsubscribed()) {
active = true;
source.subscribe(s);
}
}
} while (wip.decrementAndGet() != 0);
}

final class SubscriberWrapper extends Subscriber<T> {
final Subscriber<? super T> actual;

final AtomicBoolean once;

public SubscriberWrapper(Subscriber<? super T> actual) {
this.actual = actual;
this.once = new AtomicBoolean();
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable e) {
actual.onError(e);
next();
}

@Override
public void onCompleted() {
actual.onCompleted();
next();
}

@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}

void next() {
if (once.compareAndSet(false, true)) {
complete(this);
}
}
}

public static void main(String[] args) {
PublishSubject<Integer> ps = PublishSubject.create();

TestSubscriber<Integer> ts1 = TestSubscriber.create();
TestSubscriber<Integer> ts2 = TestSubscriber.create();

Observable<Integer> source = Observable.create(new SequenceSubscribers<>(ps));

source.subscribe(ts1);
source.subscribe(ts2);

ps.onNext(1);
ps.onNext(2);

ts1.assertValues(1, 2);
ts2.assertNoValues();

ts1.unsubscribe();

ps.onNext(3);
ps.onNext(4);
ps.onCompleted();

ts1.assertValues(1, 2);
ts2.assertValues(3, 4);
ts2.assertCompleted();
}
}

关于java - RxJava : How to Subscribe only if the Observable is Cold?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35292180/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com