akka actors多次使用同一actor类后未收到任何消息

lf5gs5x2  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(229)

akka参与者的示例没有收到任何消息。
这是我们的演员。演员收到消息后运行Kafka消费者。

package package1.akka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import package.kafka.SimpleConsumer;
import package.utils.StringUtils;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import akka.actor.ActorSelection;
import akka.actor.UntypedActor;

@Component
@Scope("prototype")
public class TestAKKAActor  extends UntypedActor{

    private @Value("${kafka}") String kafka;

    ActorSelection messageParentActor = AkkaFactory
            .getActorSystem(SystemType.LOCAL)
            .actorSelection("akka://AKKASystem/user/messageParentActor");

    Logger logger = org.slf4j.LoggerFactory.getLogger(TestAKKAActor.class);

    @Override
    public void onReceive(Object arg0) throws Exception {
        System.out.println("hi! message recieved");
        processAkkaMessage("x"+StringUtils.generateRandomKey(5));
        if(arg0.equals(Msg.DONE)){
            context().system().shutdown();
        }

    }

    public void processAkkaMessage(Object arg0){

        // topic has to be 16 digit
        String topic = arg0.toString();
        String randomFiller = StringUtils.generateRandomKey(4)
                + StringUtils.generateRandomKey(4);
        String group = "c" + randomFiller;
        logger.info("Consumer Activating for >> {}", topic);
        SimpleConsumer csc = new SimpleConsumer(kafka+ ":2181", group);

        ConsumerConnector consumerConnector = csc.getConsumerConnector();
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        topicCountMap.put(topic, new Integer(1));

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);

        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {

            String newString = new String(it.next().message());

            try {
                System.out.println(newString);
            } catch (Exception e) {
                e.printStackTrace();
            }

        }

        context().stop(getSelf());;
    }

}

下面的类有一个test()方法,它创建同一参与者的5个不同引用,并使用for循环启动5个不同的使用者。正在使用restful api触发方法test()。但是演员只接收到第三个演员的信息。之后的演员不会收到任何信息,因此也不会开始任何新的消费者。我需要启动尽可能多的消费者,但似乎无法通过三个消费者线程。
我们也研究了内存问题,但这似乎不是问题所在。
如果我们删除使用者代码,那么参与者就可以正常运行,所有生成的参与者都可以接收消息。全部5个。

package package1.service;

import package1.akka.AkkaFactory;
import package1.akka.SystemType;
import package1.config.SpringExtension;
import package1.utils.StringUtils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import akka.actor.ActorRef;

@Component
public class TestManager {

    @Autowired
    SpringExtension ext;

    public void test() {
        for (int i = 1; i <= 5; i++) {

            //
            String actorName = "c" + new StringUtils().generateRandomKey(5);
            //generating a random string for the akka actor name.

            ActorRef testACTOR = AkkaFactory.getActorSystem(SystemType.LOCAL)
                    .actorOf(ext.props("testAKKAActor"), actorName);

            testACTOR.tell("MESSAGE", null);
        }

    }
}

我们也研究了内存问题,但这似乎不是问题所在。
如果我们删除使用者代码,那么参与者就可以正常运行,所有生成的参与者都可以接收消息。全部5个。我们遇到了死胡同,正在寻求帮助。
控制台日志:

2016-02-18 16:22:24.064  INFO 15835 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring FrameworkServlet 'dispatcherServlet'
2016-02-18 16:22:24.065  INFO 15835 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization started
2016-02-18 16:22:24.096  INFO 15835 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization completed in 31 ms
hi! message recieved
hi! message recieved
2016-02-18 16:22:24.195  INFO 15835 --- [t-dispatcher-10] package1.akka.TestAKKAActor                  : Consumer Activating for >> xkKmU4
2016-02-18 16:22:24.195  INFO 15835 --- [lt-dispatcher-5] package1.akka.TestAKKAActor                  : Consumer Activating for >> xW6yCq
hi! message recieved
2016-02-18 16:22:24.195  INFO 15835 --- [lt-dispatcher-2] package1.akka.TestAKKAActor                  : Consumer Activating for >> xnCPLM
2016-02-18 16:22:24.203  INFO 15835 --- [t-dispatcher-10] org.apache.zookeeper.ZooKeeper           : Initiating client connection, connectString=localhost:2181 sessionTimeout=400 watcher=org.I0Itec.zkclient.ZkClient@71eee08
2016-02-18 16:22:24.208  INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn          : Opening socket connection to server fadmin.local/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2016-02-18 16:22:24.208  INFO 15835 --- [lt-dispatcher-2] org.apache.zookeeper.ZooKeeper           : Initiating client connection, connectString=localhost:2181 sessionTimeout=400 watcher=org.I0Itec.zkclient.ZkClient@6bdda2eb
2016-02-18 16:22:24.209  INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn          : Socket connection established to fadmin.local/127.0.0.1:2181, initiating session
2016-02-18 16:22:24.209  INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn          : Opening socket connection to server fadmin.local/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2016-02-18 16:22:24.209  INFO 15835 --- [lt-dispatcher-5] org.apache.zookeeper.ZooKeeper           : Initiating client connection, connectString=localhost:2181 sessionTimeout=400 watcher=org.I0Itec.zkclient.ZkClient@14e20f5f
2016-02-18 16:22:24.210  INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn          : Socket connection established to fadmin.local/127.0.0.1:2181, initiating session
2016-02-18 16:22:24.211  INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn          : Opening socket connection to server fadmin.local/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2016-02-18 16:22:24.211  INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn          : Socket connection established to fadmin.local/127.0.0.1:2181, initiating session
2016-02-18 16:22:24.226  INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn          : Session establishment complete on server fadmin.local/127.0.0.1:2181, sessionid = 0x152f3af925700ab, negotiated timeout = 6000
2016-02-18 16:22:24.227  INFO 15835 --- [t-dispatcher-10] package1.kafka.SimpleConsumer                : Consumer started with group id: cj9UWsoFu
2016-02-18 16:22:24.259  INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn          : Session establishment complete on server fadmin.local/127.0.0.1:2181, sessionid = 0x152f3af925700ac, negotiated timeout = 6000
2016-02-18 16:22:24.260  INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn          : Session establishment complete on server fadmin.local/127.0.0.1:2181, sessionid = 0x152f3af925700ad, negotiated timeout = 6000
2016-02-18 16:22:24.268  INFO 15835 --- [lt-dispatcher-2] package1.kafka.SimpleConsumer                : Consumer started with group id: cUTZi2kSe
2016-02-18 16:22:24.269  INFO 15835 --- [lt-dispatcher-5] package1.kafka.SimpleConsumer                : Consumer started with group id: cJ6sE9NfW

如上面的日志所示,“嗨!信息接收“日志只打印3次。但是,在删除消费代码时。五次都会收到消息。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题