gpt4 book ai didi

javascript - 为什么我发布的延迟 Observable 工厂被多次调用?

转载 作者:塔克拉玛干 更新时间:2023-11-02 20:30:27 34 4
gpt4 key购买 nike

我有一个任务流,它将排队直到使用 .zip() 运算符触发信号 Subject。信号主题订阅当前正在运行的任务。我也在尝试观察任务的进度排放。

我尝试做的是使用 .publish() 多播任务 Observable 以便我可以允许信号 Subject 订阅 .last()任务的排放导致出队并订阅任务的一般进度排放。

似乎有效。但是,每当我查看打印出来的内容时,我的 Observable 工厂似乎都被每个 .subscribe() 调用调用,即使我使用了 .publish()。我误解了多播的工作原理吗?我相信 .publish()ed Observable 将与工厂一起创建,并且将共享单个实例,但直到 .connect() 被调用为止。

我的任务运行器

注意调用 tasker.defer()

"use strict";

const {
Observable,
Subject,
BehaviorSubject
} = Rx;

// How often to increase project in a task
const INTERVAL_TIME = 200;

// Keep track of how many tasks we have
let TASK_ID = 0;

// Easy way to print out observers
function easyObserver(prefix = "Observer") {
return {
next: data => console.log(`[${prefix}][next]: ${data}`),
error: err => console.error(`[${prefix}][error] ${err}`),
complete: () => console.log(`[${prefix}][complete] Complete`)
};
}

// Simulate async task
function tasker(name = "", id = TASK_ID++) {
console.log(`tasker called for ${id}`);

let progress = 0;
const progress$ = new BehaviorSubject(`Task[${name||id}][${progress}%]`);
console.log(`Task[${name||id}][started]`);
let interval = setInterval(() => {
progress = (progress + (Math.random() * 50));
if (progress >= 100) {
progress = 100;
clearInterval(interval);
progress$.next(`Task[${name||id}][${progress}%]`);
progress$.complete();
return;
}
progress$.next(`Task[${name||id}][${progress}%]`);
}, INTERVAL_TIME);

return progress$.asObservable();
}

// Create a signal subject that will tell the queue when to next
const dequeueSignal = new BehaviorSubject();

// Make some tasks
const tasks$ = Observable
.range(0, 3);

// Queue tasks until signal tells us to emit the next task
const queuedTasks$ = Observable
.zip(tasks$, dequeueSignal, (i, s) => i);

// Create task observables
const mcQueuedTasks$ = queuedTasks$
.map(task => Observable.defer(() => tasker(`MyTask${task}`)))
.publish();

// Print out the task progress
const progressSubscription = mcQueuedTasks$
.switchMap(task => task)
.subscribe(easyObserver("queuedTasks$"));

// Cause the signal subject to trigger the next task
const taskCompleteSubscription = mcQueuedTasks$
.switchMap(task => task.last())
.delay(500)
.subscribe(dequeueSignal);

// Kick everything off
mcQueuedTasks$.connect();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

我的输出

请注意您是如何看到多次调用 tasker 的,这行 tasker called for N 并且调用了工厂的主体。然而,在任何进度排放发生之前,tasker() 将使用下一个 TASK_ID 再次调用。输出似乎是正确的,因为 Task[MyTask0] 不会跳过任何索引,只会跳过 TASK_ID

tasker called for 0
Task[MyTask0][started]
[queuedTasks$][next]: Task[MyTask0][0%]
tasker called for 1
Task[MyTask0][started]
[queuedTasks$][next]: Task[MyTask0][20.688413934455674%]
[queuedTasks$][next]: Task[MyTask0][32.928520335195564%]
[queuedTasks$][next]: Task[MyTask0][42.58361384849108%]
[queuedTasks$][next]: Task[MyTask0][73.1297043008671%]
[queuedTasks$][next]: Task[MyTask0][100%]
tasker called for 2
Task[MyTask1][started]
[queuedTasks$][next]: Task[MyTask1][0%]
tasker called for 3
Task[MyTask1][started]
[queuedTasks$][next]: Task[MyTask1][37.16513927245511%]
[queuedTasks$][next]: Task[MyTask1][47.27771448102375%]
[queuedTasks$][next]: Task[MyTask1][60.45983311604027%]
[queuedTasks$][next]: Task[MyTask1][100%]
tasker called for 4
Task[MyTask2][started]
[queuedTasks$][next]: Task[MyTask2][0%]
tasker called for 5
Task[MyTask2][started]
[queuedTasks$][next]: Task[MyTask2][32.421275902708544%]
[queuedTasks$][next]: Task[MyTask2][41.30332084025583%]
[queuedTasks$][next]: Task[MyTask2][77.44113197852694%]
[queuedTasks$][next]: Task[MyTask2][100%]
[queuedTasks$][complete] Complete

最佳答案

看起来 Observable.defer 在此函数中是不必要的:

// Create task observables
const mcQueuedTasks$ = queuedTasks$
.map(task => Observable.defer(() => tasker(`MyTask${task}`)))
.publish();

The Defer operator waits until an observer subscribes to it, and then it generates an Observable, typically with an Observable factory function. It does this afresh for each subscriber, so although each subscriber may think it is subscribing to the same Observable, in fact each subscriber gets its own individual sequence.

你已经在这里创建了一个 Observable:

// Make some tasks
const tasks$ = Observable
.range(0, 3);

map 循环中,您正在为每个任务创建一个额外的 Observable...

去掉 Observable.defer 这样函数看起来像这样:

// Create task observables
const mcQueuedTasks$ = queuedTasks$
.map(task => tasker(`MyTask${task}`))
.publish();

片段:

"use strict";

const {
Observable,
Subject,
BehaviorSubject
} = Rx;

// How often to increase project in a task
const INTERVAL_TIME = 200;

// Keep track of how many tasks we have
let TASK_ID = 0;

// Easy way to print out observers
function easyObserver(prefix = "Observer") {
return {
next: data => console.log(`[${prefix}][next]: ${data}`),
error: err => console.error(`[${prefix}][error] ${err}`),
complete: () => console.log(`[${prefix}][complete] Complete`)
};
}

// Simulate async task
function tasker(name = "", id = TASK_ID++) {
console.log(`tasker called for ${id}`);

let progress = 0;
const progress$ = new BehaviorSubject(`Task[${name||id}][${progress}%]`);
console.log(`Task[${name||id}][started]`);
let interval = setInterval(() => {
progress = (progress + (Math.random() * 50));
if (progress >= 100) {
progress = 100;
clearInterval(interval);
progress$.next(`Task[${name||id}][${progress}%]`);
progress$.complete();
return;
}
progress$.next(`Task[${name||id}][${progress}%]`);
}, INTERVAL_TIME);

return progress$.asObservable();
}

// Create a signal subject that will tell the queue when to next
const dequeueSignal = new BehaviorSubject();

// Make some tasks
const tasks$ = Observable
.range(0, 3);

// Queue tasks until signal tells us to emit the next task
const queuedTasks$ = Observable
.zip(tasks$, dequeueSignal, (i, s) => i);

// Create task observables
const mcQueuedTasks$ = queuedTasks$
.map(task => tasker(`MyTask${task}`))
.publish();

// Print out the task progress
const progressSubscription = mcQueuedTasks$
.switchMap(task => task)
.subscribe(easyObserver("queuedTasks$"));

// Cause the signal subject to trigger the next task
const taskCompleteSubscription = mcQueuedTasks$
.switchMap(task => task.last())
.delay(500)
.subscribe(dequeueSignal);

// Kick everything off
mcQueuedTasks$.connect();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

希望对您有所帮助。

关于javascript - 为什么我发布的延迟 Observable 工厂被多次调用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48175350/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com