gpt4 book ai didi

scala - 实现弱引用的 Eventbus 参与者?

转载 作者:行者123 更新时间:2023-12-04 16:11:27 28 4
gpt4 key购买 nike

说未引用的参与者仍然订阅事件流是否正确?至少,这是我从 Akka 实验中得到的……

我正在尝试在 EventBus 场景中为参与者实现弱引用。在这些情况下,事件监听器/参与者通常来来去去。与应该一直在场的独立 Actor 不同。显式取消注册当然有效。但我并不总是能够察觉到合适的时机来做这件事。

Akka 在这样的用例中提供了吗?

val as = ActorSystem.create("weak")
var actor = as.actorOf(Props[ExceptionHandler])
as.eventStream.subscribe(actor,classOf[Exception])

// an event is published & received
as.eventStream.publish(new KnownProblem)

//session expires or whatever that makes the actor redundant
actor = null
(1 to 30).foreach(_ => System.gc)

// an event is published & STILL received
as.eventStream.publish(new KnownProblem)

最佳答案

好吧,我实际上无法实现它,但是 actor 正在停止 GC。使用 Scala 2.9.2 (REPL) + Akka 2.0.3。
EventBusWeakReference[ActorRef]没有帮助 - 因为在 Akka 你也有一个 dungeonChildrenContainer ( self.children ),也可能有 Monitor订阅生命周期事件。我没有尝试的事情 - 用调度员创建 Actor ,只知道我们新的 Shiny WeakEventBus - 所以也许我没捕获重点?

这是 REPL 的代码(从适当的导入开始,然后 :paste 分两步):

// Start REPL with something like:
// scala -Yrepl-sync -classpath "/opt/akka-2.0.3/lib/akka/akka-actor-2.0.3.jar:
// /opt/akka-2.0.3/lib/akka/akka-remote-2.0.3.jar:
// /opt/akka-2.0.3/lib/akka/config-0.3.1.jar:
// /opt/akka-2.0.3/lib/akka/protobuf-java-2.4.1.jar:
// /opt/akka-2.0.3/lib/akka/netty-3.5.3.Final.jar"

// :paste 1/2
import akka.actor._
import akka.pattern._
import akka.event._
import akka.util._
import com.typesafe.config.ConfigFactory
import akka.util.Timeout
import akka.dispatch.Await
import scala.ref.WeakReference
import java.util.Comparator
import java.util.concurrent.atomic._
import java.util.UUID

case class Message(val id:String,val timestamp: Long)
case class PostMessage(
override val id:String=UUID.randomUUID().toString(),
override val timestamp: Long=new java.util.Date().getTime(),
text:String) extends Message(id, timestamp)
case class MessageEvent(val channel:String, val message:Message)

case class StartServer(nodeName: String)
case class ServerStarted(nodeName: String, actor: ActorRef)
case class IsAlive(nodeName: String)
case class IsAliveWeak(nodeName: String)
case class AmAlive(nodeName: String, actor: ActorRef)
case class GcCheck()
case class GcCheckScheduled(isScheduled: Boolean,
gcFlag: WeakReference[AnyRef])

trait WeakLookupClassification { this: WeakEventBus ⇒
protected final val subscribers = new Index[Classifier,
WeakReference[Subscriber]](mapSize(),
new Comparator[WeakReference[Subscriber]] {
def compare(a: WeakReference[Subscriber],
b: WeakReference[Subscriber]): Int = {
if (a.get == None || b.get == None) -1
else compareSubscribers(a.get.get, b.get.get)
}
})
protected def mapSize(): Int
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
protected def classify(event: Event): Classifier
protected def publish(event: Event, subscriber: Subscriber): Unit
def subscribe(subscriber: Subscriber, to: Classifier): Boolean =
subscribers.put(to, new WeakReference(subscriber))
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean =
subscribers.remove(from, new WeakReference(subscriber))
def unsubscribe(subscriber: Subscriber): Unit =
subscribers.removeValue(new WeakReference(subscriber))
def publish(event: Event): Unit = {
val i = subscribers.valueIterator(classify(event))
while (i.hasNext) publish(event, i.next().get.get)
}
}

class WeakEventBus extends EventBus with WeakLookupClassification {
type Event = MessageEvent
type Classifier=String
type Subscriber = ActorRef

protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b

protected def mapSize(): Int = 10
protected def classify(event: Event): Classifier = event.channel
protected def publish(event: Event, subscriber: Subscriber): Unit =
subscriber ! event
}

lazy val weakEventBus = new WeakEventBus

implicit val timeout = akka.util.Timeout(1000)
lazy val actorSystem = ActorSystem("serversys", ConfigFactory.parseString("""
akka {
loglevel = "DEBUG"
actor {
provider = "akka.remote.RemoteActorRefProvider"
debug {
receive = on
autoreceive = on
lifecycle = on
event-stream = on
}
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-sent-messages = on
log-received-messages = on
}
}
serverconf {
include "common"
akka {
actor {
deployment {
/root {
remote = "akka://serversys@127.0.0.1:2552"
}
}
}
remote {
netty {
hostname = "127.0.0.1"
port = 2552
}
}
}
}
""").getConfig("serverconf"))

class Server extends Actor {
private[this] val scheduled = new AtomicBoolean(false)
private[this] val gcFlagRef = new AtomicReference[WeakReference[AnyRef]]()

val gcCheckPeriod = Duration(5000, "millis")

override def preRestart(reason: Throwable, message: Option[Any]) {
self ! GcCheckScheduled(scheduled.get, gcFlagRef.get)
super.preRestart(reason, message)
}

def schedule(period: Duration, who: ActorRef) =
actorSystem.scheduler.scheduleOnce(period)(who ! GcCheck)

def receive = {
case StartServer(nodeName) =>
sender ! ServerStarted(nodeName, self)
if (scheduled.compareAndSet(false, true))
schedule(gcCheckPeriod, self)
val gcFlagObj = new AnyRef()
gcFlagRef.set(new WeakReference(gcFlagObj))
weakEventBus.subscribe(self, nodeName)
actorSystem.eventStream.unsubscribe(self)
case GcCheck =>
val gcFlag = gcFlagRef.get
if (gcFlag == null) {
sys.error("gcFlag")
}
gcFlag.get match {
case Some(gcFlagObj) =>
scheduled.set(true)
schedule(gcCheckPeriod, self)
case None =>
println("Actor stopped because of GC: " + self)
context.stop(self)
}
case GcCheckScheduled(isScheduled, gcFlag) =>
if (isScheduled && scheduled.compareAndSet(false, isScheduled)) {
gcFlagRef.compareAndSet(null, gcFlag)
schedule(gcCheckPeriod, self)
}
case IsAlive(nodeName) =>
println("Im alive (default EventBus): " + nodeName)
sender ! AmAlive(nodeName, self)
case e: MessageEvent =>
println("Im alive (weak EventBus): " + e)
}
}

// :paste 2/2
class Root extends Actor {
def receive = {
case start @ StartServer(nodeName) =>
val server = context.actorOf(Props[Server], nodeName)
context.watch(server)
Await.result(server ? start, timeout.duration)
.asInstanceOf[ServerStarted] match {
case started @ ServerStarted(nodeName, _) =>
sender ! started
case _ =>
throw new RuntimeException(
"[S][FAIL] Could not start server: " + start)
}
case isAlive @ IsAlive(nodeName) =>
Await.result(context.actorFor(nodeName) ? isAlive,
timeout.duration).asInstanceOf[AmAlive] match {
case AmAlive(nodeName, _) =>
println("[S][SUCC] Server is alive : " + nodeName)
case _ =>
throw new RuntimeException("[S][FAIL] Wrong answer: " + nodeName)
}
case isAliveWeak @ IsAliveWeak(nodeName) =>
actorSystem.eventStream.publish(MessageEvent(nodeName,
PostMessage(text="isAlive-default")))
weakEventBus.publish(MessageEvent(nodeName,
PostMessage(text="isAlive-weak")))
}
}

lazy val rootActor = actorSystem.actorOf(Props[Root], "root")

object Root {
def start(nodeName: String) = {
val msg = StartServer(nodeName)
var startedActor: Option[ActorRef] = None
Await.result(rootActor ? msg, timeout.duration)
.asInstanceOf[ServerStarted] match {
case succ @ ServerStarted(nodeName, actor) =>
println("[S][SUCC] Server started: " + succ)
startedActor = Some(actor)
case _ =>
throw new RuntimeException("[S][FAIL] Could not start server: " + msg)
}
startedActor
}
def isAlive(nodeName: String) = rootActor ! IsAlive(nodeName)
def isAliveWeak(nodeName: String) = rootActor ! IsAliveWeak(nodeName)
}

////////////////
// actual test
Root.start("weak")
Thread.sleep(7000L)
System.gc()
Root.isAlive("weak")

关于scala - 实现弱引用的 Eventbus 参与者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14121833/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com