我应该测试我的代码,通过嵌入的'withrunningkafka'使用kafka服务器的所有消息,如下所示:https://github.com/manub/scalatest-embedded-kafka
我尝试通过创建的嵌入式生产者向主题发送消息。
我尝试通过我在项目中的代码来使用生成的消息(由嵌入式生产者创建)。
“与定制生产商和消费者进行测试”应{
"work" in {
withRunningKafka {
1. val producer: KafkaProducer[String, String] =
aKafkaProducer[String](valueSerializer, config)
val topic = "topic-to-test"
producer.send(new ProducerRecord[String, String](topic, "some message 1"))
producer.send(new ProducerRecord[String, String](topic, "some message 2"))
producer.close()
2. val ok: Future[Done] = Consumer
.committableSource(
consumerSettings,
Subscriptions.topics(topic))
.map(msg => println(msg.record.value()))
.runWith(Sink.ignore)
ok should be (Done)
}
}}
问题就在这里:“ok”不会给出“done”的结果。一般来说,我测试消费者的逻辑正确吗?
2条答案
按热度按时间ws51t4hk1#
我认为你同时面临两个问题:
kafka消费者无限地等待元素(正如@dvim所说的),所以您需要.take()才能让它真正结束
默认情况下,kafka使用者组将从当前主题的末尾开始,而不是从开头开始,因此不会使用在主题旋转之前发布的消息。你需要一个设置,使它从主题的开始而不是结束。
6ioyuze22#
欢迎来到stackoverflow!
原因是什么
ok
从未完成并产生结果,因为源正在等待可能的进一步消息。添加.take(2)
在Map之前,源将在两个元素之后停止ok
未来将完成。