gpt4 book ai didi

scala - 与使用 ArrayBlockingQueue 的两个线程相比,Akka 流太慢

转载 作者:行者123 更新时间:2023-12-01 05:54:46 26 4
gpt4 key购买 nike

我正在解决将当前项目切换到 Akka 流时出现的性能问题。

在简化问题之后,似乎 Akka 流传递的消息比我预期的要少得多。

这里我有两段非常简单的代码,都只是一次向磁盘上的文件写入 10 个字节。

第一个使用两个线程和一个连接它们的 ArrayBlockingQueue:

val bw = Files.newBufferedWriter(Paths.get("test.txt"))
val target = "0123456789".toCharArray
val abq = new ArrayBlockingQueue[Array[Char]](10000)

new Thread(new Runnable {
override def run(): Unit = {
while (true) {
bw.write(abq.take())
}
}
}).start()

while (true) {
abq.put(target)
}

第二个使用 Akka 流:
implicit val system: ActorSystem = ActorSystem("TestActorSystem")
implicit val materializer: ActorMaterializer = ActorMaterializer()

// Source & Sink runs in two actors
// Both output of Source & input of Sink were buffered
Source
.repeat(ByteString("0123456789"))
.buffer(8192, OverflowStrategy.backpressure)
.async
.runWith(
FileIO
.toPath(Paths.get("test.txt"))
.withAttributes(Attributes.inputBuffer(8192, 8192))
)

我发现在我的测试机上,第一个写入文件的速度为 27.4MB/s,而第二个仅以 3.4MB/s 的速度写入文件。 thread-with-arrayBlockingQueue 比 Akka 快 8 倍。

我试图将 Sink 从 FileIO 更改为写入 BufferedWriter 的手写 Sink。这让第二个的速度增加到 5.5MB/s,但仍然比第一个慢 5 倍。

在我的理解中,Akka 流会有更好的表现
比较它达到现在。

在这种情况下,我做错了什么吗?

最佳答案

我想出了在这种情况下真正使它变慢的原因。

我已将问题中的 FileIO 接收器替换为带有一些时间计数器的手写接收器,以测量接收器中每一步的成本。

新水槽在这里:

final class FileWriteSink extends GraphStage[SinkShape[Array[Char]]] {

private val in: Inlet[Array[Char]] = Inlet("ArrayOfCharInlet")

override def shape: SinkShape[Array[Char]] = SinkShape.of(in)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
// note that the operations to these vars below are not thread-safe
// but it is fairly enough to show the time differences in a large scale with a relatively low cost
private var count = 0L

private var grabTime = 0L
private var writeTime = 0L
private var pullTime = 0L
private var gapTime = 0L
private var counterTime = 0L

private var lastTime = 0L
private var currTime = System.nanoTime()

@inline private def timeDiff(): Long = {
lastTime = currTime
currTime = System.nanoTime()
currTime - lastTime
}

private val bw = Files.newBufferedWriter(Paths.get("test.xml"))
setHandler(in, new InHandler {
override def onPush(): Unit = {
gapTime += timeDiff()
count += 1
if (count % 1000000 == 0) {
println(s"count: $count, gapTime: $gapTime, counterTime: $counterTime, grabTime: $grabTime, writeTime: $writeTime, pullTime: $pullTime")
println(s"count: $count, gapTime-avg: ${gapTime / count}, counterTime-avg: ${counterTime / count}, grabTime-avg: ${grabTime / count}, writeTime-avg: ${writeTime / count}, pullTime-avg: ${pullTime / count}")
}
counterTime += timeDiff()
val v = grab(in)
grabTime += timeDiff()
bw.write(v)
writeTime += timeDiff()
pull(in)
pullTime += timeDiff()
}
})

override def preStart(): Unit = {
pull(in)
}
}
}

}

然后我从我的测试环境中得到了这个日志:
count: 1000000, gapTime: 3220562882, counterTime: 273008576, grabTime: 264956553, writeTime: 355040917, pullTime: 260033342
count: 1000000, gapTime-avg: 3220, counterTime-avg: 273, grabTime-avg: 264, writeTime-avg: 355, pullTime-avg: 260
count: 2000000, gapTime: 6307318517, counterTime: 549671865, grabTime: 532654603, writeTime: 708526613, pullTime: 524305026
count: 2000000, gapTime-avg: 3153, counterTime-avg: 274, grabTime-avg: 266, writeTime-avg: 354, pullTime-avg: 262
count: 3000000, gapTime: 9403004835, counterTime: 821901662, grabTime: 797670212, writeTime: 1054416804, pullTime: 786163401
count: 3000000, gapTime-avg: 3134, counterTime-avg: 273, grabTime-avg: 265, writeTime-avg: 351, pullTime-avg: 262

事实证明,pull() 和下一个 onPush() 调用之间的时间间隔在这里非常缓慢。

即使缓冲区已满,Sink 也不需要等待源生成下一个元素。在我的测试环境中,两次 onPush() 调用之间仍有近 3 微秒的时间间隔。

所以我在这里应该期望的是 Akka 流将具有很大的整体吞吐量。而在设计实际流的结构时,需要仔细注意和处理两次 onPush() 调用之间的间隔时间。

关于scala - 与使用 ArrayBlockingQueue 的两个线程相比,Akka 流太慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50349340/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com