- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
说未引用的参与者仍然订阅事件流是否正确?至少,这是我从 Akka 实验中得到的……
我正在尝试在 EventBus 场景中为参与者实现弱引用。在这些情况下,事件监听器/参与者通常来来去去。与应该一直在场的独立 Actor 不同。显式取消注册当然有效。但我并不总是能够察觉到合适的时机来做这件事。
Akka 在这样的用例中提供了吗?
val as = ActorSystem.create("weak")
var actor = as.actorOf(Props[ExceptionHandler])
as.eventStream.subscribe(actor,classOf[Exception])
// an event is published & received
as.eventStream.publish(new KnownProblem)
//session expires or whatever that makes the actor redundant
actor = null
(1 to 30).foreach(_ => System.gc)
// an event is published & STILL received
as.eventStream.publish(new KnownProblem)
最佳答案
好吧,我实际上无法实现它,但是 actor 正在停止 GC。使用 Scala 2.9.2 (REPL) + Akka 2.0.3。EventBus
与 WeakReference[ActorRef]
没有帮助 - 因为在 Akka 你也有一个 dungeon
与 ChildrenContainer
( self.children
),也可能有 Monitor
订阅生命周期事件。我没有尝试的事情 - 用调度员创建 Actor ,只知道我们新的 Shiny WeakEventBus
- 所以也许我没捕获重点?
这是 REPL 的代码(从适当的导入开始,然后 :paste
分两步):
// Start REPL with something like:
// scala -Yrepl-sync -classpath "/opt/akka-2.0.3/lib/akka/akka-actor-2.0.3.jar:
// /opt/akka-2.0.3/lib/akka/akka-remote-2.0.3.jar:
// /opt/akka-2.0.3/lib/akka/config-0.3.1.jar:
// /opt/akka-2.0.3/lib/akka/protobuf-java-2.4.1.jar:
// /opt/akka-2.0.3/lib/akka/netty-3.5.3.Final.jar"
// :paste 1/2
import akka.actor._
import akka.pattern._
import akka.event._
import akka.util._
import com.typesafe.config.ConfigFactory
import akka.util.Timeout
import akka.dispatch.Await
import scala.ref.WeakReference
import java.util.Comparator
import java.util.concurrent.atomic._
import java.util.UUID
case class Message(val id:String,val timestamp: Long)
case class PostMessage(
override val id:String=UUID.randomUUID().toString(),
override val timestamp: Long=new java.util.Date().getTime(),
text:String) extends Message(id, timestamp)
case class MessageEvent(val channel:String, val message:Message)
case class StartServer(nodeName: String)
case class ServerStarted(nodeName: String, actor: ActorRef)
case class IsAlive(nodeName: String)
case class IsAliveWeak(nodeName: String)
case class AmAlive(nodeName: String, actor: ActorRef)
case class GcCheck()
case class GcCheckScheduled(isScheduled: Boolean,
gcFlag: WeakReference[AnyRef])
trait WeakLookupClassification { this: WeakEventBus ⇒
protected final val subscribers = new Index[Classifier,
WeakReference[Subscriber]](mapSize(),
new Comparator[WeakReference[Subscriber]] {
def compare(a: WeakReference[Subscriber],
b: WeakReference[Subscriber]): Int = {
if (a.get == None || b.get == None) -1
else compareSubscribers(a.get.get, b.get.get)
}
})
protected def mapSize(): Int
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
protected def classify(event: Event): Classifier
protected def publish(event: Event, subscriber: Subscriber): Unit
def subscribe(subscriber: Subscriber, to: Classifier): Boolean =
subscribers.put(to, new WeakReference(subscriber))
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean =
subscribers.remove(from, new WeakReference(subscriber))
def unsubscribe(subscriber: Subscriber): Unit =
subscribers.removeValue(new WeakReference(subscriber))
def publish(event: Event): Unit = {
val i = subscribers.valueIterator(classify(event))
while (i.hasNext) publish(event, i.next().get.get)
}
}
class WeakEventBus extends EventBus with WeakLookupClassification {
type Event = MessageEvent
type Classifier=String
type Subscriber = ActorRef
protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b
protected def mapSize(): Int = 10
protected def classify(event: Event): Classifier = event.channel
protected def publish(event: Event, subscriber: Subscriber): Unit =
subscriber ! event
}
lazy val weakEventBus = new WeakEventBus
implicit val timeout = akka.util.Timeout(1000)
lazy val actorSystem = ActorSystem("serversys", ConfigFactory.parseString("""
akka {
loglevel = "DEBUG"
actor {
provider = "akka.remote.RemoteActorRefProvider"
debug {
receive = on
autoreceive = on
lifecycle = on
event-stream = on
}
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-sent-messages = on
log-received-messages = on
}
}
serverconf {
include "common"
akka {
actor {
deployment {
/root {
remote = "akka://serversys@127.0.0.1:2552"
}
}
}
remote {
netty {
hostname = "127.0.0.1"
port = 2552
}
}
}
}
""").getConfig("serverconf"))
class Server extends Actor {
private[this] val scheduled = new AtomicBoolean(false)
private[this] val gcFlagRef = new AtomicReference[WeakReference[AnyRef]]()
val gcCheckPeriod = Duration(5000, "millis")
override def preRestart(reason: Throwable, message: Option[Any]) {
self ! GcCheckScheduled(scheduled.get, gcFlagRef.get)
super.preRestart(reason, message)
}
def schedule(period: Duration, who: ActorRef) =
actorSystem.scheduler.scheduleOnce(period)(who ! GcCheck)
def receive = {
case StartServer(nodeName) =>
sender ! ServerStarted(nodeName, self)
if (scheduled.compareAndSet(false, true))
schedule(gcCheckPeriod, self)
val gcFlagObj = new AnyRef()
gcFlagRef.set(new WeakReference(gcFlagObj))
weakEventBus.subscribe(self, nodeName)
actorSystem.eventStream.unsubscribe(self)
case GcCheck =>
val gcFlag = gcFlagRef.get
if (gcFlag == null) {
sys.error("gcFlag")
}
gcFlag.get match {
case Some(gcFlagObj) =>
scheduled.set(true)
schedule(gcCheckPeriod, self)
case None =>
println("Actor stopped because of GC: " + self)
context.stop(self)
}
case GcCheckScheduled(isScheduled, gcFlag) =>
if (isScheduled && scheduled.compareAndSet(false, isScheduled)) {
gcFlagRef.compareAndSet(null, gcFlag)
schedule(gcCheckPeriod, self)
}
case IsAlive(nodeName) =>
println("Im alive (default EventBus): " + nodeName)
sender ! AmAlive(nodeName, self)
case e: MessageEvent =>
println("Im alive (weak EventBus): " + e)
}
}
// :paste 2/2
class Root extends Actor {
def receive = {
case start @ StartServer(nodeName) =>
val server = context.actorOf(Props[Server], nodeName)
context.watch(server)
Await.result(server ? start, timeout.duration)
.asInstanceOf[ServerStarted] match {
case started @ ServerStarted(nodeName, _) =>
sender ! started
case _ =>
throw new RuntimeException(
"[S][FAIL] Could not start server: " + start)
}
case isAlive @ IsAlive(nodeName) =>
Await.result(context.actorFor(nodeName) ? isAlive,
timeout.duration).asInstanceOf[AmAlive] match {
case AmAlive(nodeName, _) =>
println("[S][SUCC] Server is alive : " + nodeName)
case _ =>
throw new RuntimeException("[S][FAIL] Wrong answer: " + nodeName)
}
case isAliveWeak @ IsAliveWeak(nodeName) =>
actorSystem.eventStream.publish(MessageEvent(nodeName,
PostMessage(text="isAlive-default")))
weakEventBus.publish(MessageEvent(nodeName,
PostMessage(text="isAlive-weak")))
}
}
lazy val rootActor = actorSystem.actorOf(Props[Root], "root")
object Root {
def start(nodeName: String) = {
val msg = StartServer(nodeName)
var startedActor: Option[ActorRef] = None
Await.result(rootActor ? msg, timeout.duration)
.asInstanceOf[ServerStarted] match {
case succ @ ServerStarted(nodeName, actor) =>
println("[S][SUCC] Server started: " + succ)
startedActor = Some(actor)
case _ =>
throw new RuntimeException("[S][FAIL] Could not start server: " + msg)
}
startedActor
}
def isAlive(nodeName: String) = rootActor ! IsAlive(nodeName)
def isAliveWeak(nodeName: String) = rootActor ! IsAliveWeak(nodeName)
}
////////////////
// actual test
Root.start("weak")
Thread.sleep(7000L)
System.gc()
Root.isAlive("weak")
关于scala - 实现弱引用的 Eventbus 参与者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14121833/
我一直在尝试编写我自己的弱/强指针,但我并不清楚其中的关系。我似乎遇到的所有事情都没有说清楚,而且一个医生经常会反驳另一个医生所说的话。任何人都可以详细解释弱/强指针关系,也许还有图像或代码示例吗?
静态/动态和强/弱类型之间有什么区别? 最佳答案 静态/动态类型涉及何时获取类型信息(在编译时或运行时) 强/弱类型是关于如何严格区分类型(例如,语言是否尝试从字符串到数字进行隐式转换)。 请参阅wi
我有一个非常奇怪的情况。我的服务器当前已关闭并收到 503 http 状态代码。基于如下给定的代码,代码进入 if 条件,但是当我将调试点置于 let error = self?.decodeErro
对于短期运行的操作,避免[weak self]是否可以接受?例如,URLSession 将保留 dataTask(with:completion:) 的闭包: final class ViewCont
我有一个非常奇怪的情况。我的服务器当前已关闭并收到 503 http 状态代码。基于如下给定的代码,代码进入 if 条件,但是当我将调试点置于 let error = self?.decodeErro
假设我有以下情况: Test1.java import java.lang.ref.WeakReference; public class Test1 { public WeakReferen
有没有办法告诉模拟器(我正在使用 Modelsim)当信号不是由任一双向接口(interface)驱动时将信号拉到弱“H”? 例如,如果我有一个 I2C 信号 I2C_SDA 被声明为来自 2 个模块
这是将一些值放入 WeakHashMap 中然后从映射中删除这些值的代码片段。它如何处理分配的内存? import java.util.*; public class WeakHashMap_Main
我正在尝试弄清楚智能指针可以实现什么。 但有一些感觉像是障碍。 普通指针有一个简短的定义 Someclass *p但是智能指针有点长shared_ptr p当您必须处理这些指针的模板(如 vector
这两行代码有区别吗? __weak IBOutlet UITextField *usernameField; @property (weak) IBOutlet UITextField *userna
我最近发现了 WeakHashMap Java 中的数据结构。 但是,我不明白它在不再正常使用时对映射进行垃圾收集是什么意思。数据结构如何知道我将不再在我的程序中使用 key ?如果长时间不引用 ke
我的问题是为什么 weak IBOutletCollection 总是 nil?如果将弱变强,我所有的按钮都在那里,这真的很奇怪。我试图理解苹果的逻辑,我看不出单个按钮和一组按钮在内存管理方面没有区别
我创建一个 WeakHashMap 为 WeakHashMap map = new WeakHashMap(); map.put(emp,"hello"); 其中 emp 是一个 Employee 对
在delphi sydney中,在对象(不是接口(interface))前面设置[weak]会受到惩罚吗?示例: TMyObject = class(Tobject) Private
在delphi sydney中,在对象(不是接口(interface))前面设置[weak]会受到惩罚吗?示例: TMyObject = class(Tobject) Private
众所周知,我们将声明一个可以打破强引用循环的弱委托(delegate)对象: // MyObject.h ... @property (nonatomic, weak) id delegate; ..
我已阅读this article关于Java中不同类型的引用(强引用、软引用、弱引用、幻像引用),但我不太理解。 这些引用类型之间有什么区别?每种类型何时使用? 最佳答案 Java 提供了两种不同类型
我突然想到...我相信弱引用的生命 与该引用的范围(在函数内或全局内)相关。 所以我想知道,只要我将数据处理保持在特定范围内,那么我应该可以使用 weak 与 strong 引用。正确的? 我问的原因
func addAdditionalElement(_ additionalSelectedElementsIDs: [String], startX: CGFloat, containerView:
我想要一个指针,以便我可以判断引用计数何时为 1。本质上,指针的工作方式类似于 weak_ptr,但清理工作需要手动进行。也就是说,程序每隔一段时间就会经历一个指针循环,并检查哪些指针只剩下一个引用。
我是一名优秀的程序员,十分优秀!