我正在与Kafka一起开发akka,并为我的Kafka消费者编写测试用例。我使用嵌入式kafka进行单元测试。
当我尝试运行我的测试用例时,一切正常,但在最后一次测试中发生了以下异常:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://testActor/system/StreamSupervisor-0/flow-0-1-mapAsyncUnordered#-2130769]] after [1000 ms]. Message of type [akka.stream.impl.fusing.ActorGraphInterpreter$Snapshot$] was sent by [Actor[akka://testActor/system/StreamSupervisor-0#-867168141]]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:675)
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:696)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:202)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:875)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:113)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:873)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:334)
at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:285)
at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:289)
at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:241)
at java.lang.Thread.run(Thread.java:748)
这是我的密码。
我的测试方法是:
@Test
public void publishMessage() {
final TestKit probe = new TestKit(system);
final Config config = system.settings().config().getConfig("akka.kafka.producer");
ActorRef childMaker = probe.getTestActor();
final ProducerSettings<String, String> producerSettings =
ProducerSettings.create(config, new StringSerializer(), new StringSerializer())
.withBootstrapServers(bootstrapServers);
Source.range(1, 10)
.map(Object::toString)
.map(value -> new ProducerRecord<>(topic, 0, "key1", value))
.runWith(Producer.plainSink(producerSettings), materializer);
new EventFilter(Logging.Info.class, system)
.occurrences(1)
.matches("Starting up Consumer:")
.matches("Consumer Started:")
.intercept(() -> TestActorRef
.create(system, KafkaConsumerPlainExternalSource.props(new RequestRegisterConsumer(system,
config, bootstrapServers, groupId, topic, (byte) 0, childMaker))));
}
我的Kafka消费类看起来像:
public class KafkaConsumerPlainExternalSource extends AbstractLoggingActor {
private static RequestRegisterConsumer consumerConf;
static Props props(RequestRegisterConsumer consumerConf) {
return Props.create(KafkaConsumerPlainExternalSource.class, consumerConf);
}
public KafkaConsumerPlainExternalSource(RequestRegisterConsumer consumerConf) {
KafkaConsumerPlainExternalSource.consumerConf = consumerConf;
}
@Override
public Receive createReceive() {
return receiveBuilder().build();
}
@Override
public void preStart() {
log().info("Starting up Consumer: " + self().path().toString());
//Update
akka.kafka.javadsl.Consumer.plainExternalSource(consumer, Subscriptions
.assignment(new TopicPartition(consumerConf.getTopic(), consumerConf.getTopicPartition())))
.mapAsync(10, Consumer :: consume)
.to(Sink.ignore())
.run(ActorMaterializer.create(consumerConf.getActorSystem()));
log().info("Consumer Started: " + self().path().toString());
}
}
我的application.conf文件是:
> akka {
> loggers = [akka.testkit.TestEventListener]
> test {
> timefactor = 1.0
> filter-leeway = 10s
> calling-thread-dispatcher {
> type = akka.testkit.CallingThreadDispatcherConfigurator
> } } kafka.producer {//producer conf} }
当我在最后一个测试用例中加上10秒的睡眠时间,我的测试就可以正常运行了。我找不到这个异常的根本原因。
暂无答案!
目前还没有任何答案,快来回答吧!