gpt4 book ai didi

RxJS 服务调用限制/排队

转载 作者:行者123 更新时间:2023-12-02 04:41:20 26 4
gpt4 key购买 nike

我正在尝试使用 RxJS 来实现服务调用限制/排队。

例如,Google Maps 的 Geocoder API。假设我不希望它每秒被调用一次以上,但是我的应用程序的一个或多个部分可能会比​​这更频繁地请求地理编码。我希望请求排队,相邻请求至少相隔 1 秒,但如果在此等待期间不再需要请求,我也希望能够“取消”请求。

这是 RxJS 的适用用途吗,如果是这样,它会是什么样子?

谢谢。

最佳答案

这里有一些东西可以指导你( jsfiddle ):

// Helper functions
function remove_from_queue(queue, id) {
queue.forEach(function(x, index){
if (x.execute.request === id) {
queue.splice(index, 1);
}
});
// console.log('queue after removal', queue);
}

function add_to_queue (queue, id){
queue.push({execute : {request : id}});
}

function getFirstInQueue(queue){
return queue[0];
}

function noop(x) {}

function log(label) {
return function (x) {
console.log.call(console, label, x);
}
}

function timestamp(label){
return function (x) {
console.log.call(console, Date.now() - startingDate, label,x );
}
}

function label(label){
return function (x) {
var res = {};
res[label] = x;
return res;
}
}

var startingDate = Date.now();

var requests$ = Rx.Observable.generateWithRelativeTime(
{request : 1},
function (x) { return x.request < 10; },
function (x) { return {request : x.request + 1}; },
function (x) { return x; },
function (x) { return 100 ; }
);

var cancelledRequests$ = Rx.Observable.generateWithRelativeTime(
{request : 1},
function (x) { return x.request < 20; },
function (x) { return {request : x.request + 4}; },
function (x) { return x; },
function (x) { return 500 ; }
);

var timer$ = Rx.Observable.interval(990).map(function (){return {}}).take(10);

var source$ = Rx.Observable.merge(
requests$.map(label('execute')),
cancelledRequests$.map(label('cancel')),
timer$
)
//.do(log('source'));

controlledSource$ = source$
.scan(function (state, command){
var requestsToExecuteQueue = state.requestsToExecuteQueue;
if (command.cancel) {
remove_from_queue(requestsToExecuteQueue, command.cancel.request);
}
if (command.execute) {
add_to_queue(requestsToExecuteQueue, command.execute.request);
}
console.log('queue', requestsToExecuteQueue.slice())

return {
command : command,
requestExec$ : Rx.Observable
.return(getFirstInQueue(requestsToExecuteQueue))
.filter(function(x){return x})
.do(function(x){remove_from_queue(requestsToExecuteQueue, x.execute.request)}),
requestsToExecuteQueue : requestsToExecuteQueue
}
}, {command : undefined, requestExec$ : undefined, requestsToExecuteQueue : []})
.pluck('requestExec$')
.sample(Rx.Observable.interval(1000))
.mergeAll();

controlledSource$.do(timestamp('executing request:')).subscribe(noop)

基本上 :
  • scan用于管理状态(请求队列、添加删除)
  • 对于每个请求,我们传递一个 observable,它(订阅时)释放队列的第一个元素,并从队列中删除该元素
  • sample用于每秒获得一个这样的可观察对象
  • mergeAll允许订阅那个可观察的
  • 我们必须使用 timer$对象即使在请求源完成后继续轮询队列(您仍然需要清空剩余请求的队列)。例如,您可以通过在源完成后让 timer$ 发出 X 秒或最适合您的任何内容来使该逻辑适应您的实际情况。
  • 关于RxJS 服务调用限制/排队,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37139923/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com