gpt4 book ai didi

javascript - 在 RxJS 中,Observer 是否被注入(inject)到 Observable 执行中?

转载 作者:塔克拉玛干 更新时间:2023-11-02 20:29:43 25 4
gpt4 key购买 nike

我已通读 ReactiveX文档多次,但我仍然无法准确理解观察者订阅 Observable 时会发生什么。

让我们看一个简单的例子:

import { Observable } from 'rxjs'; 

const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.complete();
});

const observer = {
next: (x) => console.log('got value ' + x),
error: (err) => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
};

observable.subscribe(observer);

StackBlitz code .

我的问题:
subscriber 在哪里传递给 Observable 的对象从何而来?

来自 RxJS documentation :

It is not a coincidence that observable.subscribe and subscribe in new Observable(function subscribe(subscriber) {...}) have the same name. In the library, they are different, but for practical purposes you can consider them conceptually equal.



因此,显然传递给 Observable 构造函数( subscriber )中订阅回调的对象是 不是 实际上是 observer目的。至少如果你引用上面关于图书馆实际工作方式的引述,至少不会。

如果不是 observer传入的对象,那么 subscriber.next(1) 到底是什么和 subscribe.complete()打电话?那如何连接到 next observer 中的属性(property)?

澄清编辑:

我知道如何使用 RxJS 并且确实可以从概念上想象观察者是被注入(inject)的(正如引用所说)。但是,我在这里希望了解它的实际工作原理。

最佳答案

Observable创建流程如下:

一个 Observable由作者定义(此处手动使用 new ,用于解释):

const myObservable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
return function tearDownLogic() {
console.log('runs when Observable for whatever reason is done (complete, error, or unsubscribed)')
}
});
subscribe回调传递给 Observable以上由 Observable constructor 本地保存:
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {
this._subscribe = subscribe;
}
}

所以,我们有整个 subscribe功能,由我们或任何其他预制 Observable 定义,保存下来以供以后执行。

可以将观察者传递给 subscribe几种形式之一的回调。要么直接作为一到三个函数(next、error、complete),要么作为具有相同三个方法中的一个或多个的对象。出于解释的目的,我们将实现最后一个更详细的选项:
const observer = {
next(v) {
console.log(v);
}
error(err) {
console.log(err);
}
complete() {
console.log('Observable has now completed and can no longer emit values to observer');
}
}

现在,有趣的部分开始了。我们通过 observer进入 Observable.subscribe(..)方法:
myObserver.subscribe(observer);

subscribe method看起来像这样:
  subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription {


const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);


if (operator) {
sink.add(operator.call(sink, this.source));
} else {
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
this._subscribe(sink) :
this._trySubscribe(sink)
);
}


if (config.useDeprecatedSynchronousErrorHandling) {
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
}


return sink;
}

简要说明, subscribe方法:
  • 收到observer以之前讨论过的一种形式
  • toSubscriber将观察者转换为 Subscriber对象,无论其传递形式如何(Subscriber 实例保存在 sink 变量中)
  • 注:operator变量是 undefined ,除非您订阅运营商。因此,只需忽略 if operator 周围的陈述
  • Subscriber扩展(原型(prototype)链接到)Subscription对象,它的原型(prototype)上有两个重要的方法:unsubscribe() , add()
  • add(..)用于将“拆卸逻辑”(函数)添加到 Observable ,它将在 Observable 时运行完成或取消订阅。它将接受传递给它的任何函数,将其包装在 Subscription 中对象,并将函数放入 Subscription_unsubscribe多变的。这个Subscription保存在 Subscriber我们在上面创建了一个名为 _subscriptions 的变量.如前所述,我们这样做是为了当 Subscriber已取消订阅或已完成,所有 add() 'ed 拆除逻辑执行
  • 作为旁注,Observable.subscribe()返回 Subscriber实例。因此,您可以调用mySubscriber.add( // some tear down logic)随时在其上添加将在 Observable 时执行的函数已完成或已取消订阅
  • 现在包含一个重要部分:this._trySubscribe(sink)运行(在 add() 内,作为参数)。 _trySubscribe(..)是实际运行 subscribe 的函数之前由 Observable 保存的回调构造函数。重要的是,它传入 sink (我们的新 Subscriber 实例)作为对 Observable 的回调打回来。换句话说,当 subscriber.next(1)里面Observable执行,我们实际上是在执行 next(1)sink ( Subscriber )实例( next()Subscriber 的原型(prototype)上)。

  • 所以,这就带我到最后,现在。 toSubscribe里面有更多细节以及取消订阅流程等,但这些超出了本问答的范围。

    简而言之,为了回答标题中的问题,观察者确实被传递到 Observable 中。 , 只是在转换为统一的 Subscriber 之后目的。

    希望这将在 future 对其他人有所帮助。

    关于javascript - 在 RxJS 中,Observer 是否被注入(inject)到 Observable 执行中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54446138/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com