gpt4 book ai didi

java - 使用 DeathWatch 杀死 Akka actor 时未发送消息

转载 作者:行者123 更新时间:2023-12-01 12:26:59 27 4
gpt4 key购买 nike

我正在尝试在 Actor 被杀时发送消息。

这是基于 Akka Deathwatch 文档: http://doc.akka.io/docs/akka/2.3.6/java/untyped-actors.html#deathwatch-java

在 serviceActor 中,我正在等待“kill”消息,但我从未真正发送此消息。因此,要在 ServiceActor 中接收消息,我使用:

else if (msg instanceof Terminated) {
final Terminated t = (Terminated) msg;
if (t.getActor() == child) {
lastSender.tell(Msg.TERMINATED, getSelf());
}
} else {
unhandled(msg);
}

我已将持续时间设置为 10 毫秒:

Duration.create(10, TimeUnit.MILLISECONDS)

但是 onReceive 方法中从未收到消息 Msg.TERMINATED :

@Override
public void onReceive(Object msg) {
if (msg == ServiceActor.Msg.SUCCESS) {
System.out.println("Success");
getContext().stop(getSelf());
} else if (msg == ServiceActor.Msg.TERMINATED) {
System.out.println("Terminated");
} else
unhandled(msg);
}

ServiceActor 失败时如何向 HelloWorld 发送消息?

整个代码:

package terminatetest;
import akka.Main;

public class Launcher {

public static void main(String args[]) {

String[] akkaArgsArray = new String[1];

akkaArgsArray[0] = "terminatetest.HelloWorld";

Main.main(akkaArgsArray);

}

}

package terminatetest;


import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.UntypedActor;

public class HelloWorld extends UntypedActor {

@Override
public void preStart() {

int counter = 0;

akka.actor.ActorSystem system = getContext().system();

final ActorRef greeter = getContext().actorOf(
Props.create(ServiceActor.class), String.valueOf(counter));

system.scheduler().scheduleOnce(
Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
public void run() {
greeter.tell(PoisonPill.getInstance(), getSelf());
}
}, system.dispatcher());

greeter.tell("http://www.google.com", getSelf());

counter = counter + 1;
}

@Override
public void onReceive(Object msg) {
if (msg == ServiceActor.Msg.SUCCESS) {
System.out.println("Success");
getContext().stop(getSelf());
} else if (msg == ServiceActor.Msg.TERMINATED) {
System.out.println("Terminated");
} else
unhandled(msg);
}
}

package terminatetest;

import static com.utils.PrintUtils.println;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;

public class ServiceActor extends UntypedActor {

final ActorRef child = this.getContext().actorOf(Props.empty(), "child");
{
this.getContext().watch(child);
}

ActorRef lastSender = getContext().system().deadLetters();

public static enum Msg {
SUCCESS, FAIL, TERMINATED;
}

@Override
public void onReceive(Object msg) {

if (msg instanceof String) {
String urlName = (String) msg;

try {
long startTime = System.currentTimeMillis();
URL url = new URL(urlName);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.connect();

BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
StringBuilder out = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
out.append(line);
}
System.out.println("Connection successful to " + url);
System.out.println("Content is " + out);
long endTime = System.currentTimeMillis();
System.out.println("Total Time : " + (endTime - startTime) + " milliseconds");

} catch (MalformedURLException mue) {
println("URL Name " + urlName);
System.out.println("MalformedURLException");
System.out.println(mue.getMessage());
mue.printStackTrace();
getSender().tell(Msg.FAIL, getSelf());
} catch (IOException ioe) {
println("URL Name " + urlName);
System.out.println("IOException");
System.out.println(ioe.getMessage());
ioe.printStackTrace();
System.out.println("Now exiting");
getSender().tell(Msg.FAIL, getSelf());
}
}

else if (msg instanceof Terminated) {
final Terminated t = (Terminated) msg;
if (t.getActor() == child) {
lastSender.tell(Msg.TERMINATED, getSelf());
}
} else {
unhandled(msg);
}
}

}

更新:我现在使用以下方法从 child Actor 本身启动毒丸:

更新 ServiceActor:

if (urlName.equalsIgnoreCase("poisonPill")) {   
this.getSelf().tell(PoisonPill.getInstance(), getSelf());
getSender().tell(Msg.TERMINATED, getSelf());
}

更新HelloWorld:

system.scheduler().scheduleOnce(
Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
public void run() {
greeter.tell("poisonPill", getSelf());
}
}, system.dispatcher());

这将显示以下输出:

startTime : 1412777375414
Connection successful to http://www.google.com
Content is ....... (I'veremoved the content for brevity)
Total Time : 1268 milliseconds
Terminated

poisonPill 消息在 10 毫秒后发送,在本例中, Actor 的生存时间为 1268 毫秒。那么为什么 actor 在发送毒药丸时没有终止呢?这是因为时间太短了吗?

更新的代码:

package terminatetest;


import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;

public class HelloWorld extends UntypedActor {

@Override
public void preStart() {

int counter = 0;

akka.actor.ActorSystem system = getContext().system();

final ActorRef greeter = getContext().actorOf(
Props.create(ServiceActor.class), String.valueOf(counter));

system.scheduler().scheduleOnce(
Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
public void run() {
greeter.tell("poisonPill", getSelf());
}
}, system.dispatcher());

greeter.tell("http://www.google.com", getSelf());

counter = counter + 1;
}

@Override
public void onReceive(Object msg) {
if (msg == ServiceActor.Msg.SUCCESS) {
System.out.println("Success");
getContext().stop(getSelf());
} else if (msg == ServiceActor.Msg.TERMINATED) {
System.out.println("Terminated");
} else
unhandled(msg);
}
}


package terminatetest;

import static com.utils.PrintUtils.println;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.UntypedActor;

public class ServiceActor extends UntypedActor {

ActorRef lastSender = getSender();

public static enum Msg {
SUCCESS, FAIL, TERMINATED;
}

@Override
public void onReceive(Object msg) {

if (msg instanceof String) {
String urlName = (String) msg;

if (urlName.equalsIgnoreCase("poisonPill")) {
this.getSelf().tell(PoisonPill.getInstance(), getSelf());
getSender().tell(Msg.TERMINATED, getSelf());
}

else {

try {
long startTime = System.currentTimeMillis();
System.out.println("startTime : "+startTime);
URL url = new URL(urlName);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.connect();

BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
StringBuilder out = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
out.append(line);
}
System.out.println("Connection successful to " + url);
System.out.println("Content is " + out);
long endTime = System.currentTimeMillis();
System.out.println("Total Time : " + (endTime - startTime) + " milliseconds");

} catch (MalformedURLException mue) {
println("URL Name " + urlName);
System.out.println("MalformedURLException");
System.out.println(mue.getMessage());
mue.printStackTrace();
getSender().tell(Msg.FAIL, getSelf());
} catch (IOException ioe) {
println("URL Name " + urlName);
System.out.println("IOException");
System.out.println(ioe.getMessage());
ioe.printStackTrace();
System.out.println("Now exiting");
getSender().tell(Msg.FAIL, getSelf());
}
}
}
}

}

最佳答案

我认为您的问题源于这样一个事实:您在构建 ServiceActor 期间仅设置 lastSender 一次,并且明确将其设置为死信。如果您想将消息发送回向您发送 String 消息的参与者,那么您需要将 lastSender 设置为该 sender()。如果不这样做,将导致您的 Msg.TERMINATED 始终成为死信。

编辑

我现在看到了真正的问题。在 HelloWorld actor 中,您正在向 ServiceActor 发送一个 PoisonPillServiceActor 将因此停止自身,从而停止 child 引用(因为它是 ServiceActor 的子 Actor)。此时,您可能会认为 Termied 消息将被传递到 ServiceActor,因为它显式监视 child(并且它可能确实被传递),但您已经向 ServiceActor 发送了 PoisonPill,因此它不会处理该消息之后收到的任何消息(这将是 Terminate),因此这就是阻止的原因:

else if (msg instanceof Terminated) {

ServiceActor 中从未被命中。

编辑2

您的参与者首先收到点击 google 的请求,然后收到“poisonPill” 消息(10 毫秒后)。当参与者按顺序处理其邮箱时,参与者会在处理消息以停止自身之前完全处理访问谷歌的请求。这就是为什么 Actor 在 10 毫秒后没有停止。你无法阻止 Actor 正在做的事情。

关于java - 使用 DeathWatch 杀死 Akka actor 时未发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26255802/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com