akka参与者的示例没有收到任何消息。
这是我们的演员。演员收到消息后运行Kafka消费者。
package package1.akka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import package.kafka.SimpleConsumer;
import package.utils.StringUtils;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import akka.actor.ActorSelection;
import akka.actor.UntypedActor;
@Component
@Scope("prototype")
public class TestAKKAActor extends UntypedActor{
private @Value("${kafka}") String kafka;
ActorSelection messageParentActor = AkkaFactory
.getActorSystem(SystemType.LOCAL)
.actorSelection("akka://AKKASystem/user/messageParentActor");
Logger logger = org.slf4j.LoggerFactory.getLogger(TestAKKAActor.class);
@Override
public void onReceive(Object arg0) throws Exception {
System.out.println("hi! message recieved");
processAkkaMessage("x"+StringUtils.generateRandomKey(5));
if(arg0.equals(Msg.DONE)){
context().system().shutdown();
}
}
public void processAkkaMessage(Object arg0){
// topic has to be 16 digit
String topic = arg0.toString();
String randomFiller = StringUtils.generateRandomKey(4)
+ StringUtils.generateRandomKey(4);
String group = "c" + randomFiller;
logger.info("Consumer Activating for >> {}", topic);
SimpleConsumer csc = new SimpleConsumer(kafka+ ":2181", group);
ConsumerConnector consumerConnector = csc.getConsumerConnector();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
String newString = new String(it.next().message());
try {
System.out.println(newString);
} catch (Exception e) {
e.printStackTrace();
}
}
context().stop(getSelf());;
}
}
下面的类有一个test()方法,它创建同一参与者的5个不同引用,并使用for循环启动5个不同的使用者。正在使用restful api触发方法test()。但是演员只接收到第三个演员的信息。之后的演员不会收到任何信息,因此也不会开始任何新的消费者。我需要启动尽可能多的消费者,但似乎无法通过三个消费者线程。
我们也研究了内存问题,但这似乎不是问题所在。
如果我们删除使用者代码,那么参与者就可以正常运行,所有生成的参与者都可以接收消息。全部5个。
package package1.service;
import package1.akka.AkkaFactory;
import package1.akka.SystemType;
import package1.config.SpringExtension;
import package1.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import akka.actor.ActorRef;
@Component
public class TestManager {
@Autowired
SpringExtension ext;
public void test() {
for (int i = 1; i <= 5; i++) {
//
String actorName = "c" + new StringUtils().generateRandomKey(5);
//generating a random string for the akka actor name.
ActorRef testACTOR = AkkaFactory.getActorSystem(SystemType.LOCAL)
.actorOf(ext.props("testAKKAActor"), actorName);
testACTOR.tell("MESSAGE", null);
}
}
}
我们也研究了内存问题,但这似乎不是问题所在。
如果我们删除使用者代码,那么参与者就可以正常运行,所有生成的参与者都可以接收消息。全部5个。我们遇到了死胡同,正在寻求帮助。
控制台日志:
2016-02-18 16:22:24.064 INFO 15835 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring FrameworkServlet 'dispatcherServlet'
2016-02-18 16:22:24.065 INFO 15835 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization started
2016-02-18 16:22:24.096 INFO 15835 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization completed in 31 ms
hi! message recieved
hi! message recieved
2016-02-18 16:22:24.195 INFO 15835 --- [t-dispatcher-10] package1.akka.TestAKKAActor : Consumer Activating for >> xkKmU4
2016-02-18 16:22:24.195 INFO 15835 --- [lt-dispatcher-5] package1.akka.TestAKKAActor : Consumer Activating for >> xW6yCq
hi! message recieved
2016-02-18 16:22:24.195 INFO 15835 --- [lt-dispatcher-2] package1.akka.TestAKKAActor : Consumer Activating for >> xnCPLM
2016-02-18 16:22:24.203 INFO 15835 --- [t-dispatcher-10] org.apache.zookeeper.ZooKeeper : Initiating client connection, connectString=localhost:2181 sessionTimeout=400 watcher=org.I0Itec.zkclient.ZkClient@71eee08
2016-02-18 16:22:24.208 INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn : Opening socket connection to server fadmin.local/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2016-02-18 16:22:24.208 INFO 15835 --- [lt-dispatcher-2] org.apache.zookeeper.ZooKeeper : Initiating client connection, connectString=localhost:2181 sessionTimeout=400 watcher=org.I0Itec.zkclient.ZkClient@6bdda2eb
2016-02-18 16:22:24.209 INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn : Socket connection established to fadmin.local/127.0.0.1:2181, initiating session
2016-02-18 16:22:24.209 INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn : Opening socket connection to server fadmin.local/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2016-02-18 16:22:24.209 INFO 15835 --- [lt-dispatcher-5] org.apache.zookeeper.ZooKeeper : Initiating client connection, connectString=localhost:2181 sessionTimeout=400 watcher=org.I0Itec.zkclient.ZkClient@14e20f5f
2016-02-18 16:22:24.210 INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn : Socket connection established to fadmin.local/127.0.0.1:2181, initiating session
2016-02-18 16:22:24.211 INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn : Opening socket connection to server fadmin.local/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2016-02-18 16:22:24.211 INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn : Socket connection established to fadmin.local/127.0.0.1:2181, initiating session
2016-02-18 16:22:24.226 INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn : Session establishment complete on server fadmin.local/127.0.0.1:2181, sessionid = 0x152f3af925700ab, negotiated timeout = 6000
2016-02-18 16:22:24.227 INFO 15835 --- [t-dispatcher-10] package1.kafka.SimpleConsumer : Consumer started with group id: cj9UWsoFu
2016-02-18 16:22:24.259 INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn : Session establishment complete on server fadmin.local/127.0.0.1:2181, sessionid = 0x152f3af925700ac, negotiated timeout = 6000
2016-02-18 16:22:24.260 INFO 15835 --- [min.local:2181)] org.apache.zookeeper.ClientCnxn : Session establishment complete on server fadmin.local/127.0.0.1:2181, sessionid = 0x152f3af925700ad, negotiated timeout = 6000
2016-02-18 16:22:24.268 INFO 15835 --- [lt-dispatcher-2] package1.kafka.SimpleConsumer : Consumer started with group id: cUTZi2kSe
2016-02-18 16:22:24.269 INFO 15835 --- [lt-dispatcher-5] package1.kafka.SimpleConsumer : Consumer started with group id: cJ6sE9NfW
如上面的日志所示,“嗨!信息接收“日志只打印3次。但是,在删除消费代码时。五次都会收到消息。
暂无答案!
目前还没有任何答案,快来回答吧!