我是想在一个演员被杀时传达一个信息。
这是基于Akka死亡观察文件:http://doc.akka.io/docs/akka/2.3.6/java/untyped-actors.html#deathwatch-java
在serviceActor中,我在等待一个“kill”消息,但实际上我从来没有发送过这个消息,所以为了在ServiceActor中接收消息,我用途:
else if (msg instanceof Terminated) {
final Terminated t = (Terminated) msg;
if (t.getActor() == child) {
lastSender.tell(Msg.TERMINATED, getSelf());
}
} else {
unhandled(msg);
}
我已将持续时间设置为10毫秒:
Duration.create(10, TimeUnit.MILLISECONDS)
但onReceive方法中从未接收到消息Msg.TERMINATED
:
@Override
public void onReceive(Object msg) {
if (msg == ServiceActor.Msg.SUCCESS) {
System.out.println("Success");
getContext().stop(getSelf());
} else if (msg == ServiceActor.Msg.TERMINATED) {
System.out.println("Terminated");
} else
unhandled(msg);
}
当ServiceActor失败时,如何向HelloWorld发送消息?
完整代码:
package terminatetest;
import akka.Main;
public class Launcher {
public static void main(String args[]) {
String[] akkaArgsArray = new String[1];
akkaArgsArray[0] = "terminatetest.HelloWorld";
Main.main(akkaArgsArray);
}
}
package terminatetest;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.UntypedActor;
public class HelloWorld extends UntypedActor {
@Override
public void preStart() {
int counter = 0;
akka.actor.ActorSystem system = getContext().system();
final ActorRef greeter = getContext().actorOf(
Props.create(ServiceActor.class), String.valueOf(counter));
system.scheduler().scheduleOnce(
Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
public void run() {
greeter.tell(PoisonPill.getInstance(), getSelf());
}
}, system.dispatcher());
greeter.tell("http://www.google.com", getSelf());
counter = counter + 1;
}
@Override
public void onReceive(Object msg) {
if (msg == ServiceActor.Msg.SUCCESS) {
System.out.println("Success");
getContext().stop(getSelf());
} else if (msg == ServiceActor.Msg.TERMINATED) {
System.out.println("Terminated");
} else
unhandled(msg);
}
}
package terminatetest;
import static com.utils.PrintUtils.println;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
public class ServiceActor extends UntypedActor {
final ActorRef child = this.getContext().actorOf(Props.empty(), "child");
{
this.getContext().watch(child);
}
ActorRef lastSender = getContext().system().deadLetters();
public static enum Msg {
SUCCESS, FAIL, TERMINATED;
}
@Override
public void onReceive(Object msg) {
if (msg instanceof String) {
String urlName = (String) msg;
try {
long startTime = System.currentTimeMillis();
URL url = new URL(urlName);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.connect();
BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
StringBuilder out = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
out.append(line);
}
System.out.println("Connection successful to " + url);
System.out.println("Content is " + out);
long endTime = System.currentTimeMillis();
System.out.println("Total Time : " + (endTime - startTime) + " milliseconds");
} catch (MalformedURLException mue) {
println("URL Name " + urlName);
System.out.println("MalformedURLException");
System.out.println(mue.getMessage());
mue.printStackTrace();
getSender().tell(Msg.FAIL, getSelf());
} catch (IOException ioe) {
println("URL Name " + urlName);
System.out.println("IOException");
System.out.println(ioe.getMessage());
ioe.printStackTrace();
System.out.println("Now exiting");
getSender().tell(Msg.FAIL, getSelf());
}
}
else if (msg instanceof Terminated) {
final Terminated t = (Terminated) msg;
if (t.getActor() == child) {
lastSender.tell(Msg.TERMINATED, getSelf());
}
} else {
unhandled(msg);
}
}
}
更新:我现在开始从儿童演员自己使用的poisonPill:
更新到服务参与者:
if (urlName.equalsIgnoreCase("poisonPill")) {
this.getSelf().tell(PoisonPill.getInstance(), getSelf());
getSender().tell(Msg.TERMINATED, getSelf());
}
更新至HelloWorld:
system.scheduler().scheduleOnce(
Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
public void run() {
greeter.tell("poisonPill", getSelf());
}
}, system.dispatcher());
这将显示以下输出:
startTime : 1412777375414
Connection successful to http://www.google.com
Content is ....... (I'veremoved the content for brevity)
Total Time : 1268 milliseconds
Terminated
poisonPill消息在10毫秒后发送,而在本例中,actor的寿命为1268毫秒。那么,为什么poisonPill消息发送后actor没有终止呢?这是因为时间太短了吗?
更新的代码:
package terminatetest;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
public class HelloWorld extends UntypedActor {
@Override
public void preStart() {
int counter = 0;
akka.actor.ActorSystem system = getContext().system();
final ActorRef greeter = getContext().actorOf(
Props.create(ServiceActor.class), String.valueOf(counter));
system.scheduler().scheduleOnce(
Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
public void run() {
greeter.tell("poisonPill", getSelf());
}
}, system.dispatcher());
greeter.tell("http://www.google.com", getSelf());
counter = counter + 1;
}
@Override
public void onReceive(Object msg) {
if (msg == ServiceActor.Msg.SUCCESS) {
System.out.println("Success");
getContext().stop(getSelf());
} else if (msg == ServiceActor.Msg.TERMINATED) {
System.out.println("Terminated");
} else
unhandled(msg);
}
}
package terminatetest;
import static com.utils.PrintUtils.println;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.UntypedActor;
public class ServiceActor extends UntypedActor {
ActorRef lastSender = getSender();
public static enum Msg {
SUCCESS, FAIL, TERMINATED;
}
@Override
public void onReceive(Object msg) {
if (msg instanceof String) {
String urlName = (String) msg;
if (urlName.equalsIgnoreCase("poisonPill")) {
this.getSelf().tell(PoisonPill.getInstance(), getSelf());
getSender().tell(Msg.TERMINATED, getSelf());
}
else {
try {
long startTime = System.currentTimeMillis();
System.out.println("startTime : "+startTime);
URL url = new URL(urlName);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.connect();
BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
StringBuilder out = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
out.append(line);
}
System.out.println("Connection successful to " + url);
System.out.println("Content is " + out);
long endTime = System.currentTimeMillis();
System.out.println("Total Time : " + (endTime - startTime) + " milliseconds");
} catch (MalformedURLException mue) {
println("URL Name " + urlName);
System.out.println("MalformedURLException");
System.out.println(mue.getMessage());
mue.printStackTrace();
getSender().tell(Msg.FAIL, getSelf());
} catch (IOException ioe) {
println("URL Name " + urlName);
System.out.println("IOException");
System.out.println(ioe.getMessage());
ioe.printStackTrace();
System.out.println("Now exiting");
getSender().tell(Msg.FAIL, getSelf());
}
}
}
}
}
1条答案
按热度按时间pdsfdshx1#
我认为您的问题源于这样一个事实:您只设置了
lastSender
一次,即在构造ServiceActor
的过程中,并且显式地将其设置为deadletter。那么您需要将lastSender
设置为那个sender()
,否则将导致您的Msg.TERMINATED
总是变成死信。编辑
我现在看到了真实的的问题所在。在
HelloWorld
actor中,您正在向ServiceActor
发送一个PoisonPill
。结果,ServiceActor
将自行停止,从而也停止了child
ref(因为它是ServiceActor
子参与者)。此时,您可能会认为Terminated
消息将被传递到ServiceActor
,因为它显式监视child
(它可能确实被发送了),但是您已经向ServiceActor
发送了一个PoisonPill
,因此它不会处理在该消息(应该是Terminate
)之后接收到的任何消息,因此该块:在
ServiceActor
中从未命中。编辑2
你的参与者首先收到点击google的请求,然后收到
"poisonPill"
消息(10毫秒后)。当参与者按顺序处理它的邮箱时,参与者在处理停止自己的消息之前完全处理了点击google的请求。这就是为什么参与者在10毫秒后没有停止的原因。你不能在参与者正在做的事情的中间停止它。