gpt4 book ai didi

node.js - 如何监控 RXJS 订阅数量?

转载 作者:IT老高 更新时间:2023-10-28 22:07:03 25 4
gpt4 key购买 nike

我正在使用 Observable 从全局资源中为客户端提供事件订阅接口(interface),我需要根据事件订阅的数量来管理该资源:

  • 当订阅数大于 0 时分配全局资源
  • 当订阅数变为0时释放全局资源
  • 根据订阅数量调整资源使用策略

RXJS 中监控活跃订阅数量的正确方法是什么?


如何在 RXJS 语法中实现以下内容? -

const myEvent: Observable<any> = new Observable();

myEvent.onSubscription((newCount: number, prevCount: number) => {
if(newCount === 0) {
// release global resource
} else {
// allocate global resource, if not yet allocated
}
// for a scalable resource usage / load,
// re-configure it, based on newCount
});

我不希望每次更改都有保证通知,因此 newCount + prevCount 参数。

UPDATE-1

这不是 this 的副本,因为我需要在订阅数量发生变化时得到通知,而不仅仅是在某个时候获取计数器。

UPDATE-2

目前没有任何答案,我很快想出了一个 very ugly and limited work-around ,通过完全封装,专门针对类型Subject。非常希望找到合适的解决方案。

UPDATE-3

经过几个答案,我仍然不确定如何实现我正在尝试的内容,如下:

class CustomType {

}

class CountedObservable<T> extends Observable<T> {

private message: string; // random property

public onCount; // magical Observable that needs to be implemented

constructor(message: string) {
// super(); ???
this.message = message;
}

// random method
public getMessage() {
return this.message;
}
}

const a = new CountedObservable<CustomType>('hello'); // can create directly

const msg = a.getMessage(); // can call methods

a.subscribe((data: CustomType) => {
// handle subscriptions here;
});

// need that magic onCount implemented, so I can do this:
a.onCount.subscribe((newCount: number, prevCont: number) => {
// manage some external resources
});

如何实现上面这样的CountedObservable类,让我订阅自己,以及它的onCount属性来监控它的客户端/订阅的数量?

UPDATE-4

所有建议的解决方案似乎都过于复杂,即使我接受了其中一个答案,我最终还是得到了 completely custom solution one of my own .

最佳答案

您可以使用 defer 来实现它跟踪订阅和finalize跟踪完成情况,例如作为运营商:

// a custom operator that will count number of subscribers
function customOperator(onCountUpdate = noop) {
return function refCountOperatorFunction(source$) {
let counter = 0;

return defer(()=>{
counter++;
onCountUpdate(counter);
return source$;
})
.pipe(
finalize(()=>{
counter--;
onCountUpdate(counter);
})
);
};
}

// just a stub for `onCountUpdate`
function noop(){}

然后像这样使用它:

const source$ = new Subject();

const result$ = source$.pipe(
customOperator( n => console.log('Count updated: ', n) )
);

这是一个说明这一点的代码片段:

const { Subject, of, timer, pipe, defer } = rxjs;
const { finalize, takeUntil } = rxjs.operators;


const source$ = new Subject();

const result$ = source$.pipe(
customOperator( n => console.log('Count updated: ', n) )
);

// emit events
setTimeout(()=>{
source$.next('one');
}, 250);

setTimeout(()=>{
source$.next('two');
}, 1000);

setTimeout(()=>{
source$.next('three');
}, 1250);

setTimeout(()=>{
source$.next('four');
}, 1750);


// subscribe and unsubscribe
const subscriptionA = result$
.subscribe(value => console.log('A', value));

setTimeout(()=>{
result$.subscribe(value => console.log('B', value));
}, 500);


setTimeout(()=>{
result$.subscribe(value => console.log('C', value));
}, 1000);

setTimeout(()=>{
subscriptionA.unsubscribe();
}, 1500);


// complete source
setTimeout(()=>{
source$.complete();
}, 2000);


function customOperator(onCountUpdate = noop) {
return function refCountOperatorFunction(source$) {
let counter = 0;

return defer(()=>{
counter++;
onCountUpdate(counter);
return source$;
})
.pipe(
finalize(()=>{
counter--;
onCountUpdate(counter);
})
);
};
}

function noop(){}
<script src="https://unpkg.com/rxjs@6.4.0/bundles/rxjs.umd.min.js"></script>

* 注意:如果你的 source$ 是冷的——你可能需要 share它。

希望对你有帮助

关于node.js - 如何监控 RXJS 订阅数量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56195932/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com