gpt4 book ai didi

scala - 使用 Akka Streams 管理共享状态的惯用方法

转载 作者:行者123 更新时间:2023-12-05 07:28:59 25 4
gpt4 key购买 nike

我需要根据一些可以在流程执行之外更改的黑名单来过滤我的流程。所以,我看到了两个选择:

在单独的服务中封装黑名单

    class Blacklist(init: Set[String]) {
@volatile private var set: Set[String] = init

def get: Set[String] = set
def update(newSet: Set[String]): Unit = {
set = newSet
}
}

val blacklist = new Blacklist(Set.empty)

Flow[String]
.filterNot(blacklist.get)

将黑名单封装在一个actor中

    class Blacklist extends Actor {
import Blacklist._
private var set = Set.empty[String]

override def receive: Receive = {
case UpdateBlacklist(newset: Set[String]) =>
set = newset
case GetBlacklist =>
sender ! set
}
}

object Blacklist {
case class UpdateBlacklist(set: Set[String])
case object GetBlacklist
}

val parallelism: Int = ???
val blacklist = system.actorOf(Props(new Blacklist()))

Flow[String]
.mapAsync(parallelism) { str =>
val ask = blacklist ? Blacklist.GetBlacklist
ask.mapTo[Set[String]] map { str -> _ }
} filterNot { case (str, exclude) =>
exclude(str)
}

恐怕带有 mapAsync 的 actor holder 解决方案会引入新的异步边界,从而防止运算符融合。那么,我应该更喜欢哪一个?还是有更惯用的方式?

最佳答案

我认为在您的情况下,单独的服务解决方案更为合理。这是一个干净的解决方案,避免了 actor 开销。

但是,您的代码没有按预期工作。您将不可变的 Set 传递给 filterNot 而不是函数。您应该考虑直接使用同步集(基于 ConcurrentHashMap,如 in this answer 所述)或在您的黑名单上实现 apply,如下所示:

class Blacklist(init: Set[String]) {
val blacklist = new AtomicReference[Set[String]](init)

def apply(s: String) = (blacklist.get) (s)

def update(newSet: Set[String]): Unit = {
blacklist.set(newSet)
}
}

val blacklist = new Blacklist(Set.empty)

Flow[String].filterNot(blacklist)

我建议始终使用 AtomicReference 而不是 @volatile,因为它更明确和易于理解。

关于scala - 使用 Akka Streams 管理共享状态的惯用方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53060480/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com