java时asktimeoutexception

vshtjzan  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(548)

我正在与Kafka一起开发akka,并为我的Kafka消费者编写测试用例。我使用嵌入式kafka进行单元测试。
当我尝试运行我的测试用例时,一切正常,但在最后一次测试中发生了以下异常:

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://testActor/system/StreamSupervisor-0/flow-0-1-mapAsyncUnordered#-2130769]] after [1000 ms]. Message of type [akka.stream.impl.fusing.ActorGraphInterpreter$Snapshot$] was sent by [Actor[akka://testActor/system/StreamSupervisor-0#-867168141]]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.

at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:675)
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:696)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:202)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:875)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:113)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:873)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:334)
at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:285)
at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:289)
at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:241)
at java.lang.Thread.run(Thread.java:748)

这是我的密码。
我的测试方法是:

@Test
public void publishMessage() {

    final TestKit probe = new TestKit(system);
    final Config config = system.settings().config().getConfig("akka.kafka.producer");
    ActorRef childMaker = probe.getTestActor();

    final ProducerSettings<String, String> producerSettings =
            ProducerSettings.create(config, new StringSerializer(), new StringSerializer())
                    .withBootstrapServers(bootstrapServers);

    Source.range(1, 10)
            .map(Object::toString)
            .map(value -> new ProducerRecord<>(topic, 0, "key1", value))
            .runWith(Producer.plainSink(producerSettings), materializer);

    new EventFilter(Logging.Info.class, system)
            .occurrences(1)
            .matches("Starting up Consumer:")
            .matches("Consumer Started:")
            .intercept(() -> TestActorRef
                    .create(system, KafkaConsumerPlainExternalSource.props(new RequestRegisterConsumer(system,
                            config, bootstrapServers, groupId, topic, (byte) 0, childMaker))));

}

我的Kafka消费类看起来像:

public class KafkaConsumerPlainExternalSource extends AbstractLoggingActor {

private static RequestRegisterConsumer consumerConf;

static Props props(RequestRegisterConsumer consumerConf) {
    return Props.create(KafkaConsumerPlainExternalSource.class, consumerConf);
}

public KafkaConsumerPlainExternalSource(RequestRegisterConsumer consumerConf) {
    KafkaConsumerPlainExternalSource.consumerConf = consumerConf;
}

@Override
public Receive createReceive() {
    return receiveBuilder().build();
}

@Override
public void preStart() {

    log().info("Starting up Consumer: " + self().path().toString());

    //Update
akka.kafka.javadsl.Consumer.plainExternalSource(consumer, Subscriptions
                .assignment(new TopicPartition(consumerConf.getTopic(), consumerConf.getTopicPartition())))
                .mapAsync(10, Consumer :: consume)
                .to(Sink.ignore())
                .run(ActorMaterializer.create(consumerConf.getActorSystem()));
    log().info("Consumer Started: " + self().path().toString());
}

}
我的application.conf文件是:

>    akka { 
>   loggers = [akka.testkit.TestEventListener] 
>     test {
>     timefactor = 1.0
>     filter-leeway = 10s
>     calling-thread-dispatcher {
>       type = akka.testkit.CallingThreadDispatcherConfigurator
>     }   }   kafka.producer {//producer conf} }

当我在最后一个测试用例中加上10秒的睡眠时间,我的测试就可以正常运行了。我找不到这个异常的根本原因。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题