- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
背景
如 this question 中所述,我正在使用 Scalaz 7 iteratees 来处理常量堆空间中的大型(即无界)数据流。
我的代码如下所示:
type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]
def processChunk(c: Chunk, idx: Long): Result
def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
rs ++ vs map {
case (c, i) => processChunk(c, i)
}
} &= (data.zipWithIndex mapE Iteratee.group(P))
问题
我似乎遇到了内存泄漏,但我对 Scalaz/FP 不够熟悉,不知道该错误是在 Scalaz 中还是在我的代码中。直观上,我预计此代码仅需要(大约)P 乘以 Chunk
大小的空间。
注意:我发现a similar question其中遇到了 OutOfMemoryError
,但我的代码未使用 consume
。
测试
我运行了一些测试来尝试找出问题。总而言之,只有同时使用 zipWithIndex
和 group
时才会出现泄漏。
// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296
// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296
// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space
// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296
// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184
// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184
测试代码:
import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._
// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) =
Iteratee.enumIterator[Array[Int], IO](
Iterator.continually(Array.fill(sz)(0)).take(n))
// define an iteratee that consumes a stream of arrays
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) {
(c, a) => c + a.length
}
// define an iteratee that consumes a grouped stream of arrays
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) {
(c, as) => c + as.map(_.length).sum
}
// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
(c, vs) => c + vs.map(_._1.length).sum
}
// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
(c, v) => c + v._1.length
}
问题
最佳答案
对于那些坚持使用旧版 iteratee
API 的人来说,这并不算什么安慰,但我最近验证了一个针对 scalaz-stream API 的等效测试通过了。 。这是一个较新的流处理 API,旨在取代 iteratee
。
为了完整起见,这里是测试代码:
// create a stream containing `n` arrays with `sz` Ints in each one
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
(Process emit Array.fill(sz)(0)).repeat take n
(streamArrs(1 << 25, 1 << 14).zipWithIndex
pipe process1.chunk(4)
pipe process1.fold(0L) {
(c, vs) => c + vs.map(_._1.length.toLong).sum
}).runLast.run
这应该适用于 n
参数的任何值(前提是您愿意等待足够长的时间)——我使用 2^14 32MiB 数组(即总共半个 TiB)进行了测试随着时间的推移分配的内存)。
关于scala - 使用 Scalaz 7 zipWithIndex/group enumeratees 避免内存泄漏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19128856/
我想创建一个 Play 2 Enumeratee,它接收值并输出它们,每个 x 将它们分块在一起。秒/毫秒。这样,在具有大量用户输入的多用户 websocket 环境中,可以限制每秒接收的帧数。 我知
我想要一个 Enumeratee,它将 Enumerator 中的元素分组到最大大小的组中,其中组中的所有元素都具有相同的值。因此它将扫描 Enumerator 并提取值,只要它们的值相同。当它达到一
什么是标准的转换方式Enumerator至 Future[List]在 Play Scala 框架中? 最佳答案 您可以使用方法 run参数类型为 Iteratee[E, List[E]]获取 Fut
我想写一个枚举对象,它只是将一个 Input.El 插入迭代对象,然后返回剩余的迭代对象。我称它为前置,因为它通过在它前面加上一个值来改变流。这是我的尝试: object Enumeratees {
背景 如 this question 中所述,我正在使用 Scalaz 7 iteratees 来处理常量堆空间中的大型(即无界)数据流。 我的代码如下所示: type ErrorOrT[M[+_],
就目前情况而言,这个问题不太适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、民意调查或扩展讨论。如果您觉得这个问题可以改进并可能重新开放,visit
我是一名优秀的程序员,十分优秀!