akka引用随着play框架不断增加

bq9c1y66  于 2021-07-12  发布在  Java
关注(0)|答案(0)|浏览(255)

几周前,我将应用程序中的所有多线程操作都更改为akka。但是,由于我似乎开始耗尽堆空间(大约一周后)。
基本上看所有演员

ActorSelection selection = getContext().actorSelection("/*");

演员的数量似乎一直在增加。经过一个小时的跑步,我有2200多。它们被称为:

akka://application/user/$Aic
akka://application/user/$Alb
akka://application/user/$Alc
akka://application/user/$Am
akka://application/user/$Amb

我还注意到,打开WebSocket(并关闭WebSocket)时,会出现以下情况:

akka://application/system/Materializers/StreamSupervisor-2/flow-21-0-unnamed
akka://application/system/Materializers/StreamSupervisor-2/flow-2-0-unnamed
akka://application/system/Materializers/StreamSupervisor-2/flow-27-0-unnamed
akka://application/system/Materializers/StreamSupervisor-2/flow-23-0-unnamed

我需要做些什么来关闭它们并让它们被清洗?
我不确定内存问题是否与此相关,但事实上,在生产服务器上一小时后似乎有太多内存问题。
[编辑:添加代码以分析/统计参与者]

public class RetrieveActors extends AbstractActor {

    private String identifyId;
    private List<String> list;

    public RetrieveActors(String identifyId) {
        Logger.debug("Actor retriever identity: " + identifyId);
        this.identifyId = identifyId;
    }

    @Override
    public Receive createReceive() {
        Logger.info("RetrieveActors");
        return receiveBuilder()
                .match(String.class, request -> {
                    //Logger.info("Message: " + request + " " + new Date());
                    if(request.equalsIgnoreCase("run")) {
                        list = new ArrayList<>();
                        ActorSelection selection = getContext().actorSelection("/*");
                        selection.tell(new Identify(identifyId), getSelf());
                        //ask(selection, new Identify(identifyId), 1000).thenApply(response -> (Object) response).toCompletableFuture().get();
                    } else if(request.equalsIgnoreCase("result")) {
                        //Logger.debug("Run list: " + list + " " + new Date());
                        sender().tell(list, self());
                    } else {
                        sender().tell("Wrong command: " + request, self());
                    }
                }).match(ActorIdentity.class, identity -> {
                    if (identity.correlationId().equals(identifyId)) {
                        ActorRef ref = identity.getActorRef().orElse(null);
                        if (ref != null) { // to avoid NullPointerExceptions
                            // Log or store the identity of the actor who replied
                            //Logger.info("The actor " + ref.path().toString() + " exists and has replied!");
                            list.add(ref.path().toString());
                            // We want to discover all children of the received actor (recursive traversal)
                            ActorSelection selection = getContext().actorSelection(ref.path().toString() + "/*");
                            selection.tell(new Identify(identifyId), getSelf());
                        }

                    }
                    sender().tell(list.toString(), self());
                }).build();
    }

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题