gpt4 book ai didi

rxjs - 接收 : Buffer with variable back pressured buffer time

转载 作者:行者123 更新时间:2023-12-04 17:47:05 28 4
gpt4 key购买 nike

我玩过一点 Rx,但仍然认为自己对这个世界来说是个新手。我有一个问题,我想知道我是否可以通过 Rx 解决它。我最初的用例是在 C# 中,但以后可能希望在 JS 中使用相同的用例(尽管如果答案中有任何代码片段,任何伪代码语言都可以)

目前我有一个客户端应用程序,它只是缓冲它创建的数据(交易),并每 5 秒发送一次到服务器。此数据可以包含许多单独的交易,例如它已经存储了很多,现在上线了,所以我们可能要向服务器发送数千个。当像刚才描述的那样发送很多时,8 秒的延迟和缓冲就可以了(反正数据已经延迟了)。然而,当这个客户端连接时,例如,我们实时创建一个交易(即只有 1 或 2 个单笔交易),我希望能够立即发送这些,即不要等待 8 秒。

所以它看起来类似于 Rx buffer , 可能与 debounce 交叉?我试图绘制一个弹珠图来帮助解释使用高级图形包(不是)绘画。

enter image description here

所以,为了完成这个过程,第一个 red 弹珠被接收并立即转发。接下来 黄色 弹珠也被直接转发,因为距最后一个红色弹珠已经 1 秒了。

现在 浅蓝色 弹珠和其他一些弹珠在不到 1 秒的时间内出现,所以我们现在要缓冲这些弹珠,因为我们不想向网络发送可能成千上万的垃圾邮件请求 - 我们在这里要做的是缓冲 5 秒,然后发送我们缓冲的请求,每 5 秒发送一次,直到这个“突发”结束。在此之后,我们希望返回发送任何其他“个人”请求。

不一定完全像上面那样,基本上我们要

  • 个人交易(通过连接的客户端应用程序输入用户),立即发送(或一些非常小的延迟)

  • 检测我们何时开始获得大量交易,并让这个开始“节流”,然后以较长的时间间隔(例如 5 或 8 秒)发送缓冲的批处理

  • 不想丢弃任何交易(弹珠),我想将它们全部发送

我发现许多其他帖子与此相似,但不相同。

我想出了一些笨拙的“手动”方法来做到这一点(仅使用标准列表和各种计时器等),但我想知道使用 Rx 是否可以做到这一点这项工作,希望不太容易出现错误?

在此先感谢您的帮助!

最佳答案

我想你已经选好了,bufferdebounce 作为缓冲触发器,

when this client is, for example connected

如果你想添加一个连接事件,你也可以将它合并到 bufferTrigger 中。

console.clear()

const source = new Rx.Subject();
const bufferTrigger = source.debounceTime(500);

source
.buffer(bufferTrigger)
.subscribe(console.log);

setTimeout(() => source.next('red'), 0);
setTimeout(() => source.next('yellow'), 1000);
setTimeout(() => source.next('lightblue'), 3000);
setTimeout(() => source.next('green'), 3100);
setTimeout(() => source.next('blue'), 3200);
setTimeout(() => source.next('pink'), 4000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

在最后一次发射 5 秒后额外触发的示例

const source = new Rx.Subject();
const lastEmit = new Rx.Subject();
const maxDelayAfterLastEmit = lastEmit.delay(5000);
const bufferTrigger = source.debounceTime(500)
.merge(maxDelayAfterLastEmit);

const emits = source
.buffer(bufferTrigger)
.do(x => lastEmit.next(x))
.filter(x => x.length);

var start = new Date().getTime();
emits.subscribe(x => console.log((new Date().getTime() - start)/1000 + "s " + x));

setTimeout(() => source.next('red'), 0);
setTimeout(() => source.next('yellow'), 1000);
setTimeout(() => source.next('lightblue'), 3000);
setTimeout(() => source.next('green'), 3100);
setTimeout(() => source.next('blue1'), 3200);
setTimeout(() => source.next('blue2'), 3300);
setTimeout(() => source.next('blue3'), 3600);
setTimeout(() => source.next('pink1'), 4000);
setTimeout(() => source.next('pink2'), 4400);
setTimeout(() => source.next('pink3'), 4800);
setTimeout(() => source.next('pink4'), 5200);
setTimeout(() => source.next('pink5'), 5600);
setTimeout(() => source.next('pink6'), 6000);
setTimeout(() => source.next('pink7'), 6400);
setTimeout(() => source.next('pink8'), 6800);
setTimeout(() => source.next('pink9'), 7200);
setTimeout(() => source.next('pink10'), 7700);
setTimeout(() => source.next('pink11'), 8200);
setTimeout(() => source.next('pink12'), 8600);
setTimeout(() => source.next('pink13'), 9000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

关于rxjs - 接收 : Buffer with variable back pressured buffer time,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47985759/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com