- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我有一个任务流,它将排队直到使用 .zip()
运算符触发信号 Subject。信号主题订阅当前正在运行的任务。我也在尝试观察任务的进度排放。
我尝试做的是使用 .publish()
多播任务 Observable 以便我可以允许信号 Subject 订阅 .last()
任务的排放导致出队并订阅任务的一般进度排放。
这似乎有效。但是,每当我查看打印出来的内容时,我的 Observable 工厂似乎都被每个 .subscribe()
调用调用,即使我使用了 .publish()
。我误解了多播的工作原理吗?我相信 .publish()
ed Observable 将与工厂一起创建,并且将共享单个实例,但直到 .connect()
被调用为止。
注意调用 tasker
的 .defer()
。
"use strict";
const {
Observable,
Subject,
BehaviorSubject
} = Rx;
// How often to increase project in a task
const INTERVAL_TIME = 200;
// Keep track of how many tasks we have
let TASK_ID = 0;
// Easy way to print out observers
function easyObserver(prefix = "Observer") {
return {
next: data => console.log(`[${prefix}][next]: ${data}`),
error: err => console.error(`[${prefix}][error] ${err}`),
complete: () => console.log(`[${prefix}][complete] Complete`)
};
}
// Simulate async task
function tasker(name = "", id = TASK_ID++) {
console.log(`tasker called for ${id}`);
let progress = 0;
const progress$ = new BehaviorSubject(`Task[${name||id}][${progress}%]`);
console.log(`Task[${name||id}][started]`);
let interval = setInterval(() => {
progress = (progress + (Math.random() * 50));
if (progress >= 100) {
progress = 100;
clearInterval(interval);
progress$.next(`Task[${name||id}][${progress}%]`);
progress$.complete();
return;
}
progress$.next(`Task[${name||id}][${progress}%]`);
}, INTERVAL_TIME);
return progress$.asObservable();
}
// Create a signal subject that will tell the queue when to next
const dequeueSignal = new BehaviorSubject();
// Make some tasks
const tasks$ = Observable
.range(0, 3);
// Queue tasks until signal tells us to emit the next task
const queuedTasks$ = Observable
.zip(tasks$, dequeueSignal, (i, s) => i);
// Create task observables
const mcQueuedTasks$ = queuedTasks$
.map(task => Observable.defer(() => tasker(`MyTask${task}`)))
.publish();
// Print out the task progress
const progressSubscription = mcQueuedTasks$
.switchMap(task => task)
.subscribe(easyObserver("queuedTasks$"));
// Cause the signal subject to trigger the next task
const taskCompleteSubscription = mcQueuedTasks$
.switchMap(task => task.last())
.delay(500)
.subscribe(dequeueSignal);
// Kick everything off
mcQueuedTasks$.connect();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
请注意您是如何看到多次调用 tasker 的,这行 tasker called for N
并且调用了工厂的主体。然而,在任何进度排放发生之前,tasker()
将使用下一个 TASK_ID
再次调用。输出似乎是正确的,因为 Task[MyTask0]
不会跳过任何索引,只会跳过 TASK_ID
。
tasker called for 0
Task[MyTask0][started]
[queuedTasks$][next]: Task[MyTask0][0%]
tasker called for 1
Task[MyTask0][started]
[queuedTasks$][next]: Task[MyTask0][20.688413934455674%]
[queuedTasks$][next]: Task[MyTask0][32.928520335195564%]
[queuedTasks$][next]: Task[MyTask0][42.58361384849108%]
[queuedTasks$][next]: Task[MyTask0][73.1297043008671%]
[queuedTasks$][next]: Task[MyTask0][100%]
tasker called for 2
Task[MyTask1][started]
[queuedTasks$][next]: Task[MyTask1][0%]
tasker called for 3
Task[MyTask1][started]
[queuedTasks$][next]: Task[MyTask1][37.16513927245511%]
[queuedTasks$][next]: Task[MyTask1][47.27771448102375%]
[queuedTasks$][next]: Task[MyTask1][60.45983311604027%]
[queuedTasks$][next]: Task[MyTask1][100%]
tasker called for 4
Task[MyTask2][started]
[queuedTasks$][next]: Task[MyTask2][0%]
tasker called for 5
Task[MyTask2][started]
[queuedTasks$][next]: Task[MyTask2][32.421275902708544%]
[queuedTasks$][next]: Task[MyTask2][41.30332084025583%]
[queuedTasks$][next]: Task[MyTask2][77.44113197852694%]
[queuedTasks$][next]: Task[MyTask2][100%]
[queuedTasks$][complete] Complete
最佳答案
看起来 Observable.defer
在此函数中是不必要的:
// Create task observables
const mcQueuedTasks$ = queuedTasks$
.map(task => Observable.defer(() => tasker(`MyTask${task}`)))
.publish();
The Defer operator waits until an observer subscribes to it, and then it generates an Observable, typically with an Observable factory function. It does this afresh for each subscriber, so although each subscriber may think it is subscribing to the same Observable, in fact each subscriber gets its own individual sequence.
你已经在这里创建了一个 Observable:
// Make some tasks
const tasks$ = Observable
.range(0, 3);
在 map
循环中,您正在为每个任务创建一个额外的 Observable...
去掉 Observable.defer
这样函数看起来像这样:
// Create task observables
const mcQueuedTasks$ = queuedTasks$
.map(task => tasker(`MyTask${task}`))
.publish();
片段:
"use strict";
const {
Observable,
Subject,
BehaviorSubject
} = Rx;
// How often to increase project in a task
const INTERVAL_TIME = 200;
// Keep track of how many tasks we have
let TASK_ID = 0;
// Easy way to print out observers
function easyObserver(prefix = "Observer") {
return {
next: data => console.log(`[${prefix}][next]: ${data}`),
error: err => console.error(`[${prefix}][error] ${err}`),
complete: () => console.log(`[${prefix}][complete] Complete`)
};
}
// Simulate async task
function tasker(name = "", id = TASK_ID++) {
console.log(`tasker called for ${id}`);
let progress = 0;
const progress$ = new BehaviorSubject(`Task[${name||id}][${progress}%]`);
console.log(`Task[${name||id}][started]`);
let interval = setInterval(() => {
progress = (progress + (Math.random() * 50));
if (progress >= 100) {
progress = 100;
clearInterval(interval);
progress$.next(`Task[${name||id}][${progress}%]`);
progress$.complete();
return;
}
progress$.next(`Task[${name||id}][${progress}%]`);
}, INTERVAL_TIME);
return progress$.asObservable();
}
// Create a signal subject that will tell the queue when to next
const dequeueSignal = new BehaviorSubject();
// Make some tasks
const tasks$ = Observable
.range(0, 3);
// Queue tasks until signal tells us to emit the next task
const queuedTasks$ = Observable
.zip(tasks$, dequeueSignal, (i, s) => i);
// Create task observables
const mcQueuedTasks$ = queuedTasks$
.map(task => tasker(`MyTask${task}`))
.publish();
// Print out the task progress
const progressSubscription = mcQueuedTasks$
.switchMap(task => task)
.subscribe(easyObserver("queuedTasks$"));
// Cause the signal subject to trigger the next task
const taskCompleteSubscription = mcQueuedTasks$
.switchMap(task => task.last())
.delay(500)
.subscribe(dequeueSignal);
// Kick everything off
mcQueuedTasks$.connect();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
希望对您有所帮助。
关于javascript - 为什么我发布的延迟 Observable 工厂被多次调用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48175350/
一段时间后,我阅读了有关 RxJava concat 的内容,并决定测试一下我的理解力。但是我遇到了一些我不太理解的行为。 问题是,当我连接两个可观察对象时,根据我将它们传递给 Observable.
我正在使用来自数据库服务的数据实现自动完成: @Injectable() export class SchoolService { constructor(private db: AngularF
我正在尝试使用 RxJS 创建一个可观察的对象,它可以执行如图所示的操作。 获取一个值并等待一段固定的时间才能获得 下一个。 下一个将是该周期内发出的最后一个值 等等,跳过其余部分。 如果等待时间间隔
我有一个可观察对象和另一个提供的可观察对象改变 key 。我想构建一个在之间切换的可观察对象基于该键的对象中的可观察值。 示例: // Choose randomly between "up" or
我使用 protobuffers 在我的前端和我的 Dart 服务器之间进行通信。 那些对象没有实现 Observable . 我的 Dart 聚合物对象看起来像: @CustomTag('user-
在 java swing 项目中,我有一个模型类,它保存某个 JPanel 的状态。我需要使这些数据可供 View 使用。我认为有两种选择。有一个扩展 Observable 的类并将模型作为实例变量。
我想找到一种方法来检测观察者是否已完成使用我使用 Rx.Observable.create 创建的自定义可观察对象,以便自定义可观察对象可以结束它并正确地进行一些清理。 因此,我创建了一些测试代码,如
我正在尝试查询数据库。迭代结果列表,并为每一项再执行一个请求。在 rxjs 构建结束时,我有 Observable[]> 。但我需要Observable 。如何做到这一点? this.caseServ
我希望我的 api 上有一个方法返回 Observable> 但我希望该方法中的代码知道所有包含的 Observables 是否已完成,以便它可以关闭某些内容。最好的方法是什么? 更明确地说,我希望完
我有两个方法返回 Observable> firstAPI.getFirstInfo("1", "2"); secondApi.getSecondInfo(resultFirstObservable,
我有一个 Observable返回单个 Cursor实例(Observable)。我正在尝试利用 ContentObservable.fromCursor获取 onNext 中每个游标的行回调。 我想
我有两种返回 Observable 的方法: Observable firstObservable(); Observable secondObservable(String value); 对于第一
我正在尝试创建一个将用户数据作为 Observable 的函数,并使用来自第一个 observable 的数据从查询中添加/合并数据,然后将所有这些数据作为一个 observable 返回,我可以这样
我有一个 spec-compliant ECMAScript Observable ,具体来自 wonka library .我正在尝试将这种类型的 observable 转换为 rxjs 6 obs
为了简化问题,我在这里使用了数字和字符串。代码: const numbers$:Observable = of([1,2,3]); const strings: string[] = ["a","b"
对于我的 Android 应用程序,我需要一个 Observable 来聚合来自 7 个不同搜索的结果并作为一个集合发出。 对于最终发射,我选择了 ListMultimap其中 Content是搜索结
我正在使用改造 2.0.0-beta2 并且调试构建工作正常,但我在使用 Proguard 发布构建时遇到以下错误。 这是更新后的 logcat 错误。 11-17 18:23:22.751 1627
observer.throw(err) 和 observer.error(err) 有什么区别? 我正在使用 RxJS 版本“5.0.0-beta.12” var innerObservable =
我们有一种情况,对服务的方法调用返回一个 IObservable但我们的客户期望 IObservable .将 T1 转换为 T2 很简单。 Rx 中有什么允许这样做的吗? (即链接观察者) 我知道我
我陷入了如何将以下可观察类型转换/转换为我的目标类型的困境: 我有可观察的类型: Observable>> 我想将其转换为: Observable> 所以当我订阅它时,它会发出 List不是Obser
我是一名优秀的程序员,十分优秀!