gpt4 book ai didi

scala - 连接数百个 RxScala Observables(每个都有数百万个事件要发出)的有效方法?

转载 作者:行者123 更新时间:2023-12-05 01:03:08 24 4
gpt4 key购买 nike

我有数据存储在磁盘上,每天有数百万条记录的文件。我有一个相对高效的反序列化器,它产生发出记录的 Observables,现在足够快(1.5M 条记录/秒)。

我现在想要的是连接这些 Observables,以便我可以获得连续多天的不间断数据流。当这么简单的事情奏效时,我很激动:

val nilObs: Observable[Record] = Observable.empty
val allObs = dates.map(reader.recordsForDate(_)).foldLeft(nilObs)(_ ++ _)

然而,上述结果导致 Observable 的吞吐量很差——有 200 个连接的 Observable,我看到的是 50-100k/s,而我期望的是 1.5M/s。

我还没有对其进行分析,但正在查看 https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OperatorConcat.java它对队列做了很多工作——我想知道我是否在一个队列上创建了 N 个队列,而不是 N 个 Observables?

有没有更有效的方式以这种方式连接 Observable?

最佳答案

肯定至少有一种更快的方法可以做到这一点:

val datesObs = Observable.from(dates.toIterable)
val allObs = datesObs.concatMap(reader.recordsForDate(_))

这具有预期的性能,所以一切都很好。

关于scala - 连接数百个 RxScala Observables(每个都有数百万个事件要发出)的有效方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25270400/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com