当Akka演员使用DeathWatch被杀死时,消息不会被发送

zc0qhyus  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(154)

我是想在一个演员被杀时传达一个信息。
这是基于Akka死亡观察文件:http://doc.akka.io/docs/akka/2.3.6/java/untyped-actors.html#deathwatch-java
在serviceActor中,我在等待一个“kill”消息,但实际上我从来没有发送过这个消息,所以为了在ServiceActor中接收消息,我用途:

else if (msg instanceof Terminated) {
        final Terminated t = (Terminated) msg;
        if (t.getActor() == child) {
            lastSender.tell(Msg.TERMINATED, getSelf());
        }
    } else {
        unhandled(msg);
    }

我已将持续时间设置为10毫秒:

Duration.create(10, TimeUnit.MILLISECONDS)

但onReceive方法中从未接收到消息Msg.TERMINATED

@Override
    public void onReceive(Object msg) {
        if (msg == ServiceActor.Msg.SUCCESS) {
            System.out.println("Success");
            getContext().stop(getSelf());
        } else if (msg == ServiceActor.Msg.TERMINATED) {
            System.out.println("Terminated");
        } else
            unhandled(msg);
    }

当ServiceActor失败时,如何向HelloWorld发送消息?
完整代码:

package terminatetest;
import akka.Main;

public class Launcher {

    public static void main(String args[]) {

        String[] akkaArgsArray = new String[1];

        akkaArgsArray[0] = "terminatetest.HelloWorld";

        Main.main(akkaArgsArray);

    }

}

package terminatetest;

import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.UntypedActor;

public class HelloWorld extends UntypedActor {

    @Override
    public void preStart() {

        int counter = 0;

        akka.actor.ActorSystem system = getContext().system();

        final ActorRef greeter = getContext().actorOf(
                Props.create(ServiceActor.class), String.valueOf(counter));

        system.scheduler().scheduleOnce(
                Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
                    public void run() {
                        greeter.tell(PoisonPill.getInstance(), getSelf());
                    }
                }, system.dispatcher());

        greeter.tell("http://www.google.com", getSelf());

        counter = counter + 1;
    }

    @Override
    public void onReceive(Object msg) {
        if (msg == ServiceActor.Msg.SUCCESS) {
            System.out.println("Success");
            getContext().stop(getSelf());
        } else if (msg == ServiceActor.Msg.TERMINATED) {
            System.out.println("Terminated");
        } else
            unhandled(msg);
    }
}

package terminatetest;

import static com.utils.PrintUtils.println;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;

public class ServiceActor extends UntypedActor {

    final ActorRef child = this.getContext().actorOf(Props.empty(), "child");
    {
        this.getContext().watch(child);
    }

    ActorRef lastSender = getContext().system().deadLetters();

    public static enum Msg {
        SUCCESS, FAIL, TERMINATED;
    }

    @Override
    public void onReceive(Object msg) {

        if (msg instanceof String) {
            String urlName = (String) msg;

            try {
                long startTime = System.currentTimeMillis();
                URL url = new URL(urlName);
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.connect();

                BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                StringBuilder out = new StringBuilder();
                String line;
                while ((line = reader.readLine()) != null) {
                    out.append(line);
                }
                System.out.println("Connection successful to " + url);
                System.out.println("Content is " + out);
                long endTime = System.currentTimeMillis();
                System.out.println("Total Time : " + (endTime - startTime) + " milliseconds");

            } catch (MalformedURLException mue) {
                println("URL Name " + urlName);
                System.out.println("MalformedURLException");
                System.out.println(mue.getMessage());
                mue.printStackTrace();
                getSender().tell(Msg.FAIL, getSelf());
            } catch (IOException ioe) {
                println("URL Name " + urlName);
                System.out.println("IOException");
                System.out.println(ioe.getMessage());
                ioe.printStackTrace();
                System.out.println("Now exiting");
                getSender().tell(Msg.FAIL, getSelf());
            }
        }

        else if (msg instanceof Terminated) {
                final Terminated t = (Terminated) msg;
                if (t.getActor() == child) {
                    lastSender.tell(Msg.TERMINATED, getSelf());
                }
            } else {
                unhandled(msg);
            }
    }

}

更新:我现在开始从儿童演员自己使用的poisonPill:
更新到服务参与者:

if (urlName.equalsIgnoreCase("poisonPill")) {   
    this.getSelf().tell(PoisonPill.getInstance(), getSelf());
    getSender().tell(Msg.TERMINATED, getSelf());
}

更新至HelloWorld:

system.scheduler().scheduleOnce(
        Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
            public void run() {
                greeter.tell("poisonPill", getSelf());
            }
        }, system.dispatcher());

这将显示以下输出:

startTime : 1412777375414
Connection successful to http://www.google.com
Content is ....... (I'veremoved the content for brevity)
Total Time : 1268 milliseconds
Terminated

poisonPill消息在10毫秒后发送,而在本例中,actor的寿命为1268毫秒。那么,为什么poisonPill消息发送后actor没有终止呢?这是因为时间太短了吗?
更新的代码:

package terminatetest;

import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;

public class HelloWorld extends UntypedActor {

    @Override
    public void preStart() {

        int counter = 0;

        akka.actor.ActorSystem system = getContext().system();

        final ActorRef greeter = getContext().actorOf(
                Props.create(ServiceActor.class), String.valueOf(counter));

        system.scheduler().scheduleOnce(
                Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() {
                    public void run() {
                        greeter.tell("poisonPill", getSelf());
                    }
                }, system.dispatcher());

        greeter.tell("http://www.google.com", getSelf());

        counter = counter + 1;
    }

    @Override
    public void onReceive(Object msg) {
        if (msg == ServiceActor.Msg.SUCCESS) {
            System.out.println("Success");
            getContext().stop(getSelf());
        } else if (msg == ServiceActor.Msg.TERMINATED) {
            System.out.println("Terminated");
        } else
            unhandled(msg);
    }
}

package terminatetest;

import static com.utils.PrintUtils.println;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.UntypedActor;

public class ServiceActor extends UntypedActor {

    ActorRef lastSender = getSender();

    public static enum Msg {
        SUCCESS, FAIL, TERMINATED;
    }

    @Override
    public void onReceive(Object msg) {

        if (msg instanceof String) {
            String urlName = (String) msg;

            if (urlName.equalsIgnoreCase("poisonPill")) {   
                this.getSelf().tell(PoisonPill.getInstance(), getSelf());
                getSender().tell(Msg.TERMINATED, getSelf());
            }

            else {

                try {
                    long startTime = System.currentTimeMillis();
                    System.out.println("startTime : "+startTime);
                    URL url = new URL(urlName);
                    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                    conn.connect();

                    BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                    StringBuilder out = new StringBuilder();
                    String line;
                    while ((line = reader.readLine()) != null) {
                        out.append(line);
                    }
                    System.out.println("Connection successful to " + url);
                    System.out.println("Content is " + out);
                    long endTime = System.currentTimeMillis();
                    System.out.println("Total Time : " + (endTime - startTime) + " milliseconds");

                } catch (MalformedURLException mue) {
                    println("URL Name " + urlName);
                    System.out.println("MalformedURLException");
                    System.out.println(mue.getMessage());
                    mue.printStackTrace();
                    getSender().tell(Msg.FAIL, getSelf());
                } catch (IOException ioe) {
                    println("URL Name " + urlName);
                    System.out.println("IOException");
                    System.out.println(ioe.getMessage());
                    ioe.printStackTrace();
                    System.out.println("Now exiting");
                    getSender().tell(Msg.FAIL, getSelf());
                }
            }
        }
    }

}
pdsfdshx

pdsfdshx1#

我认为您的问题源于这样一个事实:您只设置了lastSender一次,即在构造ServiceActor的过程中,并且显式地将其设置为deadletter。那么您需要将lastSender设置为那个sender(),否则将导致您的Msg.TERMINATED总是变成死信。

编辑

我现在看到了真实的的问题所在。在HelloWorld actor中,您正在向ServiceActor发送一个PoisonPill。结果,ServiceActor将自行停止,从而也停止了child ref(因为它是ServiceActor子参与者)。此时,您可能会认为Terminated消息将被传递到ServiceActor,因为它显式监视child(它可能确实被发送了),但是您已经向ServiceActor发送了一个PoisonPill,因此它不会处理在该消息(应该是Terminate)之后接收到的任何消息,因此该块:

else if (msg instanceof Terminated) {

ServiceActor中从未命中。

编辑2

你的参与者首先收到点击google的请求,然后收到"poisonPill"消息(10毫秒后)。当参与者按顺序处理它的邮箱时,参与者在处理停止自己的消息之前完全处理了点击google的请求。这就是为什么参与者在10毫秒后没有停止的原因。你不能在参与者正在做的事情的中间停止它。

相关问题