我试图从一个kafka代理向akka流图提供数据(比如10个字符串)
下面是unittest中的图表:
val consumer = AlpakkaConsumer(KafkaBootstrapServer(kafkaURL).value, "porteur-id")
val drainingControl =
Consumer
.plainSource(consumer.kafkaConsumerSettings, Subscriptions.topics("transaction"))
.toMat(Sink.seq)(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
val streamComplete = drainingControl.drainAndShutdown()
Await.result(streamComplete, Duration.Inf).size should be > 0
unittest失败(大小等于0,应为10)
但是,如果我插入 Thread.sleep(5000)
在调用drainandshutdown()之前,它会通过。
它给我的印象是,在没有睡眠的情况下,在run()调用之后,图形是直接关闭的,它甚至没有时间处理第一条消息。如果我引入一个sleep命令,它可以遍历所有源代码内容,关闭图形,最后我得到了一些东西。
我不知道我做错了什么,因为这基本上是一个片段,你可以在任何地方找到,并作为一个例子
仅当源已传递所有消息时,如何触发drainandshutdown调用?
谢谢你的帮助!!
注意:我使用的是akka stream kafka 0.22版本
1条答案
按热度按时间jum4pzuy1#
根据Kafka的文件,
drainAndShutdown
行为是:停止从源生成消息,等待流完成并关闭使用者源,以便所有消耗的消息到达流的末尾。
这意味着您将停止在消费者中接收消息,并且它将只处理已经排队的消息。
对于测试流,我建议看一下streams testkit,因为它可以让您控制要测试的流的哪些部分