gpt4 book ai didi

javascript - 如何从 AsyncSubject(消费者模式)订阅一次元素

转载 作者:数据小太阳 更新时间:2023-10-29 03:57:08 25 4
gpt4 key购买 nike

rxjs5 中,我有一个 AsyncSubject 并想多次订阅它,但只有一个订阅者应该收到 next() 事件。所有其他人(如果他们尚未取消订阅)应立即获得 complete() 事件,而无需 next()

例子:

let fired = false;
let as = new AsyncSubject();

const setFired = () => {
if (fired == true) throw new Error("Multiple subscriptions executed");
fired = true;
}

let subscription1 = as.subscribe(setFired);
let subscription2 = as.subscribe(setFired);

// note that subscription1/2 could be unsubscribed from in the future
// and still only a single subscriber should be triggered

setTimeout(() => {
as.next(undefined);
as.complete();
}, 500);

最佳答案

您可以通过编写一个包装初始 AsyncSubject

的小类轻松实现这一点
import {AsyncSubject, Subject, Observable, Subscription} from 'rxjs/RX'

class SingleSubscriberObservable<T> {
private newSubscriberSubscribed = new Subject();

constructor(private sourceObservable: Observable<T>) {}

subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
this.newSubscriberSubscribed.next();
return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}
}

然后您可以在您的示例中尝试一下:

const as = new AsyncSubject();
const single = new SingleSubscriberObservable(as)

let fired = false;

function setFired(label:string){
return ()=>{
if(fired == true) throw new Error("Multiple subscriptions executed");
console.log("FIRED", label);
fired = true;
}
}

function logDone(label: string){
return ()=>{
console.log(`${label} Will stop subscribing to source observable`);
}
}

const subscription1 = single.subscribe(setFired('First'), ()=>{}, logDone('First'));
const subscription2 = single.subscribe(setFired('Second'), ()=>{}, logDone('Second'));
const subscription3 = single.subscribe(setFired('Third'), ()=>{}, logDone('Third'));

setTimeout(()=>{
as.next(undefined);
as.complete();
}, 500)

这里的关键是这部分:

subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
this.newSubscriberSusbscribed.next();
return this.sourceObservable.takeUntil(this.newSubscriberSubscribed).subscribe(next, error, complete);
}

每次有人调用订阅时,我们都会向 newSubscriberSubscribed 主题发送信号。

当我们订阅我们使用的底层 Observable 时

takeUntil(this.newSubscriberSubscribed)

这意味着当下一个订阅者调用时:

this.newSubscriberSubscribed.next()

先前返回的可观察对象将完成。

因此,这将导致您所问的是,每当出现新订阅时,先前的订阅就会完成。

应用程序的输出将是:

First Will stop subscribing to source observable
Second Will stop subscribing to source observable
FIRED Third
Third Will stop subscribing to source observable

编辑:

如果您想在第一个订阅者保持订阅并且所有 future 订阅立即完成的情况下执行此操作(这样当最早的订阅者仍然订阅时,没有其他人可以订阅)。你可以这样做:

class SingleSubscriberObservable<T> {
private isSubscribed: boolean = false;

constructor(private sourceObservable: Observable<T>) {}

subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription {
if(this.isSubscribed){
return Observable.empty().subscribe(next, error, complete);
}
this.isSubscribed = true;
var unsubscribe = this.sourceObservable.subscribe(next, error, complete);
return new Subscription(()=>{
unsubscribe.unsubscribe();
this.isSubscribed = false;
});
}
}

我们保留一个标志 this.isSusbscribed 来跟踪当前是否有人订阅。我们还返回一个自定义订阅,我们可以使用它在取消订阅时将此标志设置回 false。

每当有人尝试订阅时,如果我们改为将他们订阅到一个空的 Observable ,它将立即完成。输出看起来像:

Second Will stop subscribing to source observable
Third Will stop subscribing to source observable
FIRED First
First Will stop subscribing to source observable

关于javascript - 如何从 AsyncSubject(消费者模式)订阅一次元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40278971/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com