- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我正在做一个 Node 项目,需要提交数千张图片进行处理。在将这些图像上传到处理服务器之前,需要调整它们的大小,所以我有一些类似的东西:
imageList
.map(image => loadAndResizeImage)
.merge(3)
.map(image => uploadImage)
.merge(3)
.subscribe();
调整图像大小通常需要零点几秒,上传和处理大约需要 4 秒。
在等待上传队列清除时,如何防止内存中堆积数千张调整大小的图像?我可能想要调整 5 张图片的大小并等待上传,以便在图片上传完成后立即从队列中拉出下一张调整大小的图片并上传,并调整新图片的大小并将其添加到“缓冲区”。
可以在此处找到问题的说明:
https://jsbin.com/webaleduka/4/edit?js,console
这里有一个加载步骤(耗时 200 毫秒)和一个处理步骤(耗时 4 秒)。每个进程的并发数限制为 2。我们可以看到,使用 25 个初始项目,我们在内存中得到了 20 个图像。
我确实查看了缓冲区选项,但似乎都没有做我想做的事情。
目前我刚刚将加载、调整大小和上传合并到一个延迟的可观察对象中,我将其与最大并发性合并。不过,我想让图片等待上传,我相信这一定是可能的。
我正在使用 RXjs 4,但我想 5 的原理是一样的。
非常感谢。
最佳答案
在 RxJS 5 中我会这样做:
Observable.range(1, 25)
.bufferCount(5)
.concatMap(batch => { // process images
console.log('process', batch);
return Observable.from(batch)
.mergeMap(val => Observable.of('p' + val).delay(300))
.toArray();
})
.concatMap(batch => { // send images
console.log('send batch', batch);
return Observable.from(batch)
.mergeMap(val => Observable.of('s' + val).delay(500))
.toArray();
})
.subscribe(val => {
// console.log('response');
console.log('response', val);
});
与 bufferCount
运算符 我将输入数组分成 5 个项目的批处理。然后首先使用第一个 concatMap()
处理每个批处理(我故意使用 concat 是因为我想等到嵌套的 Observable 完成)。然后将处理后的数据发送到另一个 concatMap()
,后者将其发送到您的服务器。
我正在使用两个 delay()
运算符来模拟不同的任务需要不同的时间。在我们的案例中,处理图像的速度非常快,因此第一个 concatMap
将比第二个 concatMap
能够将它们发送到服务器的速度更快地发送项目,这没问题。处理后的图片会堆叠在concatMap
中,并会一个接一个地分批发送。
此演示的输出如下所示:
process [ 1, 2, 3, 4, 5 ]
send batch [ 'p1', 'p2', 'p3', 'p4', 'p5' ]
process [ 6, 7, 8, 9, 10 ]
process [ 11, 12, 13, 14, 15 ]
response [ 'sp1', 'sp2', 'sp3', 'sp4', 'sp5' ]
send batch [ 'p6', 'p7', 'p8', 'p9', 'p10' ]
process [ 16, 17, 18, 19, 20 ]
process [ 21, 22, 23, 24, 25 ]
response [ 'sp6', 'sp7', 'sp8', 'sp9', 'sp10' ]
send batch [ 'p11', 'p12', 'p13', 'p14', 'p15' ]
response [ 'sp11', 'sp12', 'sp13', 'sp14', 'sp15' ]
send batch [ 'p16', 'p17', 'p18', 'p19', 'p20' ]
response [ 'sp16', 'sp17', 'sp18', 'sp19', 'sp20' ]
send batch [ 'p21', 'p22', 'p23', 'p24', 'p25' ]
response [ 'sp21', 'sp22', 'sp23', 'sp24', 'sp25' ]
查看现场演示:https://jsbin.com/mileqa/edit?js,console
但是,如果您希望总是首先处理一个批处理而不是发送它,当它被发送而不是继续处理另一个批处理时,您必须将第二个内部 Observable 从 concatMap
移动到 toArray()
在第一个 concatMap()
调用中。
.concatMap(batch => { // process images
console.log('process', batch);
return Observable.from(batch)
.mergeMap(val => Observable.of('p' + val).delay(100))
.toArray()
.concatMap(batch => { // send images
console.log('send batch', batch);
return Observable.from(batch)
.mergeMap(val => Observable.of('s' + val).delay(500))
.toArray();
});
})
查看现场演示:https://jsbin.com/sabena/2/edit?js,console
这会产生如下输出:
process [ 1, 2, 3, 4, 5 ]
send batch [ 'p1', 'p2', 'p3', 'p4', 'p5' ]
response [ 'sp1', 'sp2', 'sp3', 'sp4', 'sp5' ]
process [ 6, 7, 8, 9, 10 ]
send batch [ 'p6', 'p7', 'p8', 'p9', 'p10' ]
response [ 'sp6', 'sp7', 'sp8', 'sp9', 'sp10' ]
process [ 11, 12, 13, 14, 15 ]
send batch [ 'p11', 'p12', 'p13', 'p14', 'p15' ]
response [ 'sp11', 'sp12', 'sp13', 'sp14', 'sp15' ]
process [ 16, 17, 18, 19, 20 ]
send batch [ 'p16', 'p17', 'p18', 'p19', 'p20' ]
response [ 'sp16', 'sp17', 'sp18', 'sp19', 'sp20' ]
process [ 21, 22, 23, 24, 25 ]
send batch [ 'p21', 'p22', 'p23', 'p24', 'p25' ]
response [ 'sp21', 'sp22', 'sp23', 'sp24', 'sp25' ]
可以看到“process”、“send batch”、“response”日志是有序的。
RxJS 4 中的实现应该几乎相同(只是运算符名称可能略有不同)。
在 RxJS 4 中还有 controlled()
operator这在 RxJS 5 中还不存在(还没有?)。我可能会做一些与您需要的非常相似的事情。
关于javascript - 了解 rxjs 中的背压 - 仅缓存 5 张等待上传的图像,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40843003/
我开始在 Ethereum blockchain 上了解如何开发智能合约以及如何写 web-script用于与智能合约交互(购买、销售、统计......)我得出了该怎么做的结论。我想知道我是否正确理解
我正在 UIView 中使用 CATransform3DMakeRotation,并且我正在尝试进行 45º,变换就像向后放置一样: 这是我拥有的“代码”,但显然没有这样做。 CATransform3
我目前正在测试 WebRTC 的功能,但我有一些脑逻辑问题。 WebRTC 究竟是什么? 我只读了“STUN”、“P2P”和其他...但是在技术方面什么是正确的 WebRTC(见下一个) 我需要什么
我在看 DelayedInit在 Scala in Depth ... 注释是我对代码的理解。 下面的 trait 接受一个非严格计算的参数(由于 => ),并返回 Unit .它的行为类似于构造函数
谁能给我指出一个用图片和简单的代码片段解释 WCF 的资源。我厌倦了谷歌搜索并在所有搜索结果中找到相同的“ABC”文章。 最佳答案 WCF 是一项非常复杂的技术,在我看来,它的文档记录非常少。启动和运
我期待以下 GetArgs.hs打印出传递给它的参数。 import System.Environment main = do args main 3 4 3 :39:1: Coul
private int vbo; private int ibo; vbo = glGenBuffers(); ibo = glGenBuffers(); glBindBuffer(GL_ARRAY_
我正在尝试一个 for 循环。我添加了一个 if 语句以在循环达到 30 时停止循环。 我见过i <= 10将运行 11 次,因为循环在达到 10 次时仍会运行。 如果有设置 i 的 if 语句,为什
我正在尝试了解 WSGI 的功能并需要一些帮助。 到目前为止,我知道它是一种服务器和应用程序之间的中间件,用于将不同的应用程序框架(位于服务器端)与应用程序连接,前提是相关框架具有 WSGI 适配器。
我是 Javascript 的新手,我正在尝试绕过 while 循环。我了解它们的目的,我想我了解它们的工作原理,但我在使用它们时遇到了麻烦。 我希望 while 值自身重复,直到两个随机数相互匹配。
我刚刚偶然发现Fabric并且文档并没有真正说明它是如何工作的。 我有根据的猜测是您需要在客户端和服务器端都安装它。 Python 代码存储在客户端,并在命令运行时通过 Fabric 的有线协议(pr
我想了解 ConditionalWeakTable .和有什么区别 class ClassA { static readonly ConditionalWeakTable OtherClass
关闭。这个问题需要更多focused .它目前不接受答案。 想改善这个问题吗?更新问题,使其仅关注一个问题 editing this post . 5年前关闭。 Improve this questi
我还没有成功找到任何可以引导我理解 UIPickerView 和 UIPickerView 模型的好例子。有什么建议吗? 最佳答案 为什么不使用默认的 Apple 文档示例?这是来自苹果文档的名为 U
我在看foldM为了获得关于如何使用它的直觉。 foldM :: Monad m => (a -> b -> m a) -> a -> [b] -> m a 在这个简单的例子中,我只返回 [Just
答案What are _mm_prefetch() locality hints?详细说明提示的含义。 我的问题是:我想要哪一个? 我正在处理一个被重复调用数十亿次的函数,其中包含一些 int 参数。
我一直在读这个article了解 gcroot 模板。我明白 gcroot provides handles into the garbage collected heap 然后 the handle
提供了一个用例: 流处理架构;事件进入 Kafka,然后由带有 MongoDB 接收器的作业进行处理。 数据库名称:myWebsite集合:用户 并且作业接收 users 集合中的 user 记录。
你好 我想更详细地了解 NFS 文件系统。我偶然发现了《NFS 图解》这本书,不幸的是它只能作为谷歌图书提供,所以有些页面丢失了。有人可能有另一个很好的资源,这将是在较低级别上了解 NFS 的良好开始
我无法理解这个问题,哪个更随机? rand() 或: rand() * rand() 我发现这是一个真正的脑筋急转弯,你能帮我吗? 编辑: 凭直觉,我知道数学答案是它们同样随机,但我忍不住认为,如果您
我是一名优秀的程序员,十分优秀!