gpt4 book ai didi

scala - Scala/Akka Streams 中的元素分组

转载 作者:行者123 更新时间:2023-12-03 19:37:54 24 4
gpt4 key购买 nike

假设我有一个不同水果的来源,我想将它们的数量插入到数据库中。

我可以做这样的事情:

Flow[Fruits]
.map { item =>
insertItemToDatabase(item)
}

但这显然很慢——当我可以将它们分组时,为什么要将每个项目都插入到数据库中?所以我想出了一个更好的解决方案:
Flow[Fruits]
.grouped(10000)
.map { items =>
insertItemsToDatabase(items)
}

但这意味着我必须持有 10 000 个元素 [banana, orange, orange, orange, banana, ...]在内存中,直到它们被刷新到数据库。这不是效率低下吗?也许我可以做这样的事情:
Flow[Fruits]
.grouped(100)
.map { items =>
consolidate(items) // this will return Map[String, Int]
}
.grouped(100)
// here I have Seq[Map[String, Int]]
.map { mapOfItems=>
insertMapToDatabase(mapOfItems)
}

根据我的理解,这也应该一次处理 10 000 个元素,但不应该占用太多内存(假设元素经常重复)。但是每个键仍然在内存中重复 100 次。我当然可以做 .grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map() ......但是没有更好的方法吗?也许是这样的:
Flow[Fruits]
.map { item =>
addToMap(item)
if(myMap.length == 10000) {
insertToDatabase(myMap)
clearMyMap()
}
}

但它是否打破了 Akka 流的概念,即处理阶段的独立性(以及并发性)?

最佳答案

如果Fruit的基数set 较低,那么您可以保留一个包含所有计数的奇异 Map,然后在流式处理所有 Fruit 值后将其刷新到数据库中。

首先,构建一个将保持运行计数的 Flow:

type Count = Int

type FruitCount = Map[Fruit, Count]

val zeroCount : FruitCount =
Map.empty[Fruit, Count] withDefaultValue 0

val appendFruitToCount : (FruitCount, Fruit) => FruitCount =
(fruitCount, fruit) => fruitCount + (fruit -> fruitCount(fruit) + 1)

val fruitCountFlow : Flow[Fruit, FruitCount, NotUsed] =
Flow[Fruit].scan(zeroCount)(appendFruitToCount)

现在创建一个接收最后一个 FruitCount 的接收器并具体化流:
val lastFruitCountSink : Sink[FruitCount, _] = Sink.lastOption[FruitCount]

val fruitSource : Source[Fruit, NotUsed] = ???

val lastFruitCountFut : Future[Option[FruitCount]] =
fruitSource
.via(fruitCountFlow)
.to(lastFruitCountSink)
.run()
lastFruitCountFut然后可用于将值发送到数据库:
lastFruitCountFut foreach (_ foreach (_ foreach { (fruit, count) =>
insertItemsToDatabase( Iterator.fill(count)(fruit) )
}))

Iterator之所以使用它,是因为它是构建 TraversableOnce 的内存效率最高的集合。水果项目。

此解决方案只会保留 1 Map在内存中,每个不同的水果类型都有 1 个键,每个键有 1 个整数。

关于scala - Scala/Akka Streams 中的元素分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45792909/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com