gpt4 book ai didi

scala - 如何在 Akka Stream 中记录流速?

转载 作者:行者123 更新时间:2023-12-01 15:15:14 25 4
gpt4 key购买 nike

我有一个带有单个流/图的 Akka Stream 应用程序。我想测量源头的流速并每 5 秒记录一次,例如“在过去 5 秒内收到 3 条消息”。我试过,

someOtherFlow
.groupedWithin(Integer.MAX_VALUE, 5 seconds)
.runForeach(seq =>
log.debug(s"received ${seq.length} messages in the last 5 seconds")
)

但它只在有消息时输出,当有 0 条消息时没有空列表。我也想要0。这可能吗?

最佳答案

示例 akka 流日志记录。

  implicit val system: ActorSystem = ActorSystem("StreamLoggingActorSystem")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val adapter: LoggingAdapter = Logging(system, "customLogger")
implicit val ec: ExecutionContextExecutor = system.dispatcher

def randomInt = Random.nextInt()

val source = Source.repeat(NotUsed).map(_ ⇒ randomInt)


val logger = source
.groupedWithin(Integer.MAX_VALUE, 5.seconds)
.log(s"in the last 5 seconds number of messages received : ", _.size)
.withAttributes(
Attributes.logLevels(
onElement = Logging.WarningLevel,
onFinish = Logging.InfoLevel,
onFailure = Logging.DebugLevel
)
)

val sink = Sink.ignore

val result: Future[Done] = logger.runWith(sink)

result.onComplete{
case Success(_) =>
println("end of stream")
case Failure(_) =>
println("stream ended with failure")
}

源代码是 here .

关于scala - 如何在 Akka Stream 中记录流速?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41980353/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com