gpt4 book ai didi

javascript - 从订阅 Observable 获取返回值

转载 作者:塔克拉玛干 更新时间:2023-11-02 21:13:14 27 4
gpt4 key购买 nike

使用 RxJS 5.0.0-rc.1,我尝试以类似于 how generators/iterators work 的方式与我的 ObserverObservable 进行通信通过使用 yield.next() 交换数据。目的是掌握对 .subscribe 的调用返回的内容,并据此修改/更新我的可观察流中的以下值。

我不完全确定这是否可能。不过,我发现您可以捕获在.subscribe 回调中抛出的异常。以下片段打印出 "Boom!":

var source = Observable.create((observer) => {
try {
observer.next(42);
} catch (e) {
// This will catch the Error
// thrown on the subscriber
console.log(e.message);
}
observer.complete();
});

source.subscribe(() => {
throw new Error('Boom!');
});

那么,如果订阅者返回一个值而不是抛出呢? Observable 有办法检索它吗?也许我正在以错误的方式接近这个。如果是这样,在这种情况下做事的“响应式(Reactive)”方式是什么?

非常感谢。


编辑

我想出的一种可能方法是为流中的每个项目提供回调函数。像这样的东西:

var source = Observable.create((observer) => {
// This will print "{ success: true }"
observer.next({ value: 42, reply: console.log });
observer.complete();
});

source.subscribe(({ value, reply }) => {
console.log('Got', value);
return reply({ success: true });
});

还有其他想法吗?


编辑 2

由于我最初的问题让我对我试图实现的目标产生了一些困惑,所以我将描述我的真实场景。我正在编写一个模块的 API,用于通过队列管理消息(很像一个简化的、内存中的 AMQP-RPC 机制),我认为 RxJS 会很合适。

它的工作方式与您预期的一样:Publisher 将消息推送到队列,然后将其传递给 Consumer。在术语中,Consumer 可以回复 Publisher,如果有兴趣,Publisher 可以收听该响应。

在理想情况下,API 应该是这样的:

Consumer().consume('some.pattern')
.subscribe(function(msg) {
// Do something with `msg`
console.log(msg.foo);
return { ok: true };
});

Publisher().publish('some.pattern', { foo: 42 })
// (optional) `.subscribe()` to get reply from Consumer

该示例将打印 42

回复Publisher 的逻辑在Consumer 函数中。但实际响应来自 .subscribe() 回调。这引出了我最初的问题:我应该如何从流的创建者那里获取返回值?

Consumer#consume() 视为:

/**
* Returns an async handler that gets invoked every time
* a new message matching the pattern of this consumer
* arrives.
*/
function waitOnMessage(observer) {
return function(msg) {
observer.next(msg);
// Conceptually, I'd like the returned
// object from `.subscribe()` to be available
// in this scope, somehow.
// That would allow me to go like:
// `sendToQueue(pubQueue, response);`
}
}

return Observable.create((observer) => {
queue.consume(waitOnMessage(observer));
});

这更有意义吗?

最佳答案

生成器和可观察对象之间确实存在相似之处。如你所见here ,可观察对象(值的异步序列)是可迭代对象(值的同步序列)的异步版本。

现在,生成器是一个返回Iterable 的函数。然而,Rxjs Observable 包含了一个生成器——又名生产者(你通过调用 subscribe 来执行/启动)和生成的异步值序列(你通过传递一个 观察者对象)。 subscribe 调用返回一个 Disposable,它允许您停止接收值(断开连接)。因此,虽然生成器和可观察对象是双重概念,但使用它们的 API 是不同的。

默认情况下,您不能使用 rxjs observable API 进行双向通信。您可能可以通过主题为自己构建反向 channel 来设法做到这一点(请注意,您必须有一个初始值才能启动循环)。

var backChannel = Rx.Subject();
backChannel.startWith(initialValue).concatMap(generateValue)
.subscribe(function observer(value){
// Do whatever
// pass a value through the backChannel
backChannel.next(someValue)
})
// generateValue is a function which takes a value from the back channel
// and returns a promise with the next value to be consumed by the observer.

你可以考虑用 :

function twoWayObsFactory (yield, initialValue) {
var backChannel = Rx.BehaviorSubject(initialValue);
var next = backChannel.next.bind(backChannel);
return {
subscribe : function (observer) {
var disposable = backChannel.concatMap(yield)
.subscribe(function(x) {
observer(next, x);
});
return {
dispose : function (){disposable.dispose(); backChannel.dispose();}
}
}
}
}

// Note that the observer is now taking an additional parameter in its signature
// for instance
// observer = function (next, yieldedValue) {
// doSomething(yieldedValue);
// next(anotherValue);
// }
// Note also that `next` is synchronous, as such you should avoir sequences
// of back-and-forth communication that is too long. If your `yield` function
// would be synchronous, you might run into stack overflow errors.
// All the same, the `next` function call should be the last line, so order of
// execution in your program is the same independently of the synchronicity of
// the `yield` function

否则,您描述的行为似乎是异步生成器的行为。我从来没有使用过这样的东西,但由于这是对某些 future 版本的 javascript 的提议,我认为你可以已经开始使用 Babel 进行尝试(参见 https://github.com/tc39/proposal-async-iteration )。

编辑:

如果您正在寻找一种环回机制(不太通用的方法,但可以非常适合您的用例,如果您想要做的事情足够简单),expand 运算符可以提供帮助.要了解其行为,请查看 doc ,以及以下关于 SO 的答案,以获取在具体上下文中使用的示例:

基本上 expand 允许您向下游发出一个值并同时在您的生产者中反馈该值。

关于javascript - 从订阅 Observable 获取返回值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40198748/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com