gpt4 book ai didi

scala - "tee"Scala 流/迭代器

转载 作者:行者123 更新时间:2023-12-01 05:51:17 24 4
gpt4 key购买 nike

我有一个表示为简单迭代器(或流)的顺序数据源。数据很大,不适合内存。此外,源可以遍历一次并且获取成本很高。
此源用于一些繁重的过程(黑盒),该过程将迭代器(或流)作为其参数来消耗线性数据。
好的,这很简单。但是如果我有两个不同的这样的消费程序,我该怎么办?正如我所说,我不想像 List 那样将输入数据吸入集合中。我也可以通过从一开始就重新阅读源代码两次来完成我的任务,但我不喜欢这样,因为它无效。
如果事实上我需要“tee”(一种克隆)迭代器(或流)以通过两个并行进程使用单个迭代器两次,而不将其缓存到内存集合中。我想如果这种方法消耗源流太快,它应该会产生背压或限制 sibling 。有效的解决方案也许应该有一些并行安全的队列缓冲区。
有谁知道如何在 Scala(或使用任何外部流库/框架)上进行此类操作?

PS我发现了一个4岁的类似问题:
One upstream stream feeding multiple downstream streams
不同之处在于我问如何使用标准 Scala 迭代器(或流)或更好的一些现有库来执行它。

最佳答案

您应该查看 fs2 streams .该示例使用常量内存从一个文件中读取并以增量方式写入另一个文件。以下是如何修改他们的示例以写入两个文件:

...

io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => !s.trim.isEmpty && !s.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.observe(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC))
.through(io.file.writeAll(Paths.get("testdata/celsius2.txt"), blockingEC))

...

关于scala - "tee"Scala 流/迭代器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53868321/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com